Сохранение данных обратно в Cassandra как RDD

Я пытаюсь прочитать сообщения от Kafka, обработать данные, а затем добавить данные в кассандру, как если бы это был RDD.

Моя проблема заключается в сохранении данных обратно в кассандру.

from __future__ import print_function

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext

appName = 'Kafka_Cassandra_Test'
kafkaBrokers = '1.2.3.4:9092'
topic = 'test'
cassandraHosts = '1,2,3'
sparkMaster = 'spark://mysparkmaster:7077'


if __name__ == "__main__":
    conf = SparkConf()
    conf.set('spark.cassandra.connection.host', cassandraHosts)

    sc = SparkContext(sparkMaster, appName, conf=conf)

    ssc = StreamingContext(sc, 1)

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBrokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.saveToCassandra('coreglead_v2', 'wordcount')

    ssc.start()
    ssc.awaitTermination()

И ошибка:

[root@gasweb2 ~]# spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/var/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 in spark-packages
    found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
    found com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 in central
    found io.netty#netty-handler;4.0.33.Final in central
    found io.netty#netty-buffer;4.0.33.Final in central
    found io.netty#netty-common;4.0.33.Final in central
    found io.netty#netty-transport;4.0.33.Final in central
    found io.netty#netty-codec;4.0.33.Final in central
    found io.dropwizard.metrics#metrics-core;3.1.2 in central
    found org.slf4j#slf4j-api;1.7.7 in central
    found org.apache.commons#commons-lang3;3.3.2 in central
    found com.google.guava#guava;16.0.1 in central
    found org.joda#joda-convert;1.2 in central
    found joda-time#joda-time;2.3 in central
    found com.twitter#jsr166e;1.1.0 in central
    found org.scala-lang#scala-reflect;2.11.7 in central
:: resolution report :: resolve 647ms :: artifacts dl 15ms
    :: modules in use:
    com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 from central in [default]
    com.google.guava#guava;16.0.1 from central in [default]
    com.twitter#jsr166e;1.1.0 from central in [default]
    datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 from spark-packages in [default]
    io.dropwizard.metrics#metrics-core;3.1.2 from central in [default]
    io.netty#netty-buffer;4.0.33.Final from central in [default]
    io.netty#netty-codec;4.0.33.Final from central in [default]
    io.netty#netty-common;4.0.33.Final from central in [default]
    io.netty#netty-handler;4.0.33.Final from central in [default]
    io.netty#netty-transport;4.0.33.Final from central in [default]
    joda-time#joda-time;2.3 from central in [default]
    org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default]
    org.apache.commons#commons-lang3;3.3.2 from central in [default]
    org.joda#joda-convert;1.2 from central in [default]
    org.scala-lang#scala-reflect;2.11.7 from central in [default]
    org.slf4j#slf4j-api;1.7.7 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   16  |   0   |   0   |   0   ||   16  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 16 already retrieved (0kB/14ms)
16/02/15 16:26:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "/var/spark/scripts/kafka_cassandra.py", line 27, in <module>
    counts.saveToCassandra('coreglead_v2', 'wordcount')
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra'

При поиске я обнаружил эту проблему GitHub, однако, похоже, это связано с другая библиотека (я не могу использовать эту библиотеку, так как использую Cassandra 3.0, и она еще не поддерживается).

Цель состоит в том, чтобы создать агрегированные данные из одного сообщения (счетчик слов используется только для тестирования) и вставить их в несколько таблиц.

Я близок к тому, чтобы просто использовать Datastax Python Driver и сам писать инструкции, но есть ли лучший способ добиться этого?


person Jim Wright    schedule 15.02.2016    source источник


Ответы (2)


Вы используете Spark Cassandra Connector от Datastax, который не поддерживает python на уровне RDD / DStream. Поддерживаются только фреймы данных. Дополнительную информацию см. В документации.

Я создал оболочку для вышеупомянутого коннектора: PySpark Cassandra. Это не полная функция по сравнению с разъемом от Datastax, но есть много чего. Кроме того, если производительность важна, возможно, стоит исследовать ее снижение.

Наконец, Spark поставляется с примером Python использования CqlInput / OutputFormat из hadoop mapreduce. На мой взгляд, не очень удобный вариант для разработчиков, но он есть.

person Frens Jan    schedule 15.02.2016

Посмотрите на свой код и прочтите описание проблемы: похоже, что вы не используете какой-либо соединитель Cassandra. Spark не поставляется с поддержкой Cassandra из коробки, так как типы данных RDD и DStream не имеют метода saveToCassandra. Вам необходимо импортировать внешний соединитель Spark-Cassandra, который расширяет типы RDD и DStream для поддержки интеграции Cassandra.

Вот почему вы получаете сообщение об ошибке: Python не может найти никакой функции saveToCassandra в типе DStream, потому что в настоящее время ее не существует.

Вам понадобится соединитель DataStax или какой-либо другой соединитель для расширения типа DStream с помощью saveToCassandra.

person egerhard    schedule 15.02.2016
comment
Спасибо за ответ, я использую коннектор по datastax: github.com/datastax/spark-cassandra -connector, в котором я указал, что запускаю spark-submit. Я новичок в Python, как мне узнать, что мне следует импортировать? - person Jim Wright; 15.02.2016
comment
@JimWright, как у вас настроены Spark и PySpark? Вы используете DataStax Enterprise? Кроме того, вы используете оболочку pyspark или как вы пытаетесь выполнить свой код? - person egerhard; 15.02.2016
comment
Я использую версию сообщества и запускаю в командной строке spark-submit: spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar --packages datastax: spark-cassandra-connector: 1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py - person Jim Wright; 16.02.2016