Как реализовать rdd.bulkSaveToCassandra в datastax

  • Я использую кластер datastax с 5.0.5.
[cqlsh 5.0.1 | Cassandra 3.0.11.1485 | DSE 5.0.5 | CQL spec 3.4.0 | Native proto

с помощью искро-кассандрового коннектора 1.6.8

Я попытался реализовать приведенный ниже код .. импорт не работает.

val rdd: RDD[SomeType] = ... // create some RDD to save import
com.datastax.bdp.spark.writer.BulkTableWriter._

rdd.bulkSaveToCassandra(keyspace, table)

Может кто-нибудь подсказать мне, как реализовать этот код. Нужна ли для этого какая-то зависимость.


person Chandra    schedule 02.03.2018    source источник


Ответы (1)


В Cassandra Spark Connector есть saveToCassandra метод, который можно использовать таким образом (взято из документация):

val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Существует также saveAsCassandraTableEx, который позволяет вам управлять созданием схемы и другими вещами - это также описано в документации, упомянутой выше.

Чтобы использовать их, вам необходимо import com.datastax.spark.connector._ описано в "Подключение к Cassandra ".

И вам нужно добавить соответствующую зависимость - но это зависит от того, какую систему сборки вы используете.

Метод bulkSaveToCassandra доступен только при использовании коннектора DSE. Вам необходимо добавить соответствующие зависимости - см. документацию для более подробной информации. Но даже основной разработчик коннектора Spark говорит, что это лучше использовать saveToCassandra вместо него.

person Alex Ott    schedule 02.03.2018
comment
Спасибо, Алекс. Ищу bulkSaveToCassandra APi. Любая идея? - person Chandra; 02.03.2018
comment
Обновил ансер - person Alex Ott; 02.03.2018
comment
Спасибо, Алекс. Причина, по которой я ищу, заключается в том, что я выполнил задание, используя DF.write.format (org.apache.spark.sql.cassandra). на 32 миллиона записей ушло 36 минут. Когда вызывается этот метод, искровая ступень бездействует в течение ~ 12 минут. Я вижу журналы в драйвере com.datastax.spark.connector.cql.CassandraConnector: отключен от кластера Cassandra. Затем он снова подключился и начал загрузку. Любая идея, почему он отключается при загрузке в кассандру. - person Chandra; 02.03.2018
comment
это может быть тайм-аут при чтении больших блоков данных или что-то в этом роде. Но без кода сложно сказать. - person Alex Ott; 02.03.2018
comment
Пожалуйста, примите этот ответ, если он решил вашу проблему, @Chandra - person suj1th; 07.03.2018