Я пытаюсь прочитать сообщения от Kafka, обработать данные, а затем добавить данные в кассандру, как если бы это был RDD.
Моя проблема заключается в сохранении данных обратно в кассандру.
from __future__ import print_function
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
appName = 'Kafka_Cassandra_Test'
kafkaBrokers = '1.2.3.4:9092'
topic = 'test'
cassandraHosts = '1,2,3'
sparkMaster = 'spark://mysparkmaster:7077'
if __name__ == "__main__":
conf = SparkConf()
conf.set('spark.cassandra.connection.host', cassandraHosts)
sc = SparkContext(sparkMaster, appName, conf=conf)
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBrokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.saveToCassandra('coreglead_v2', 'wordcount')
ssc.start()
ssc.awaitTermination()
И ошибка:
[root@gasweb2 ~]# spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/var/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 in spark-packages
found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
found com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 in central
found io.netty#netty-handler;4.0.33.Final in central
found io.netty#netty-buffer;4.0.33.Final in central
found io.netty#netty-common;4.0.33.Final in central
found io.netty#netty-transport;4.0.33.Final in central
found io.netty#netty-codec;4.0.33.Final in central
found io.dropwizard.metrics#metrics-core;3.1.2 in central
found org.slf4j#slf4j-api;1.7.7 in central
found org.apache.commons#commons-lang3;3.3.2 in central
found com.google.guava#guava;16.0.1 in central
found org.joda#joda-convert;1.2 in central
found joda-time#joda-time;2.3 in central
found com.twitter#jsr166e;1.1.0 in central
found org.scala-lang#scala-reflect;2.11.7 in central
:: resolution report :: resolve 647ms :: artifacts dl 15ms
:: modules in use:
com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 from central in [default]
com.google.guava#guava;16.0.1 from central in [default]
com.twitter#jsr166e;1.1.0 from central in [default]
datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 from spark-packages in [default]
io.dropwizard.metrics#metrics-core;3.1.2 from central in [default]
io.netty#netty-buffer;4.0.33.Final from central in [default]
io.netty#netty-codec;4.0.33.Final from central in [default]
io.netty#netty-common;4.0.33.Final from central in [default]
io.netty#netty-handler;4.0.33.Final from central in [default]
io.netty#netty-transport;4.0.33.Final from central in [default]
joda-time#joda-time;2.3 from central in [default]
org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default]
org.apache.commons#commons-lang3;3.3.2 from central in [default]
org.joda#joda-convert;1.2 from central in [default]
org.scala-lang#scala-reflect;2.11.7 from central in [default]
org.slf4j#slf4j-api;1.7.7 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 16 | 0 | 0 | 0 || 16 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 16 already retrieved (0kB/14ms)
16/02/15 16:26:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/var/spark/scripts/kafka_cassandra.py", line 27, in <module>
counts.saveToCassandra('coreglead_v2', 'wordcount')
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra'
При поиске я обнаружил эту проблему GitHub, однако, похоже, это связано с другая библиотека (я не могу использовать эту библиотеку, так как использую Cassandra 3.0, и она еще не поддерживается).
Цель состоит в том, чтобы создать агрегированные данные из одного сообщения (счетчик слов используется только для тестирования) и вставить их в несколько таблиц.
Я близок к тому, чтобы просто использовать Datastax Python Driver и сам писать инструкции, но есть ли лучший способ добиться этого?