Сохранение / обновление Spark scala cassandra

У меня есть набор данных Spark для объекта, который должен быть сохранен / обновлен в таблице cassandra с именем «предложение».

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String)
val offerDataset: Dataset[Offer] = ....

Я хочу сохранить или обновить указанный выше 'offerDataset' в cassandra с меткой времени записи, которая будет определена полем "metadata_last_modified_source_time" в 'offer' сущность.

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("metadata_last_modified_source_time")))

При письме Кассандре я столкнулся с исключением, приведенным ниже. Может ли кто-нибудь помочь мне разобраться в этой проблеме. Та же ошибка с типами util.Date и Long для metadata_last_modified_source_time.

com.datastax.driver.core.exceptions.InvalidTypeException: Value metadata_last_modified_source_time is of type bigint, not timestamp
at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:83)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:529)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnNull(BoundStatementBuilder.scala:59)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$5.apply(BoundStatementBuilder.scala:83)

person Nasreen    schedule 14.04.2020    source источник
comment
какую версию Spark Cassandra Connector вы используете? в случае первоначального объявления класса case с java.sql.Timestamp ошибка верна ...   -  person Alex Ott    schedule 14.04.2020
comment
metadata_last_modified_source_time имеет тип bigint, попробуйте сохранить временную метку эпохи как значение bigint для timestamp вместо java.sql.Timestamp   -  person Shahab Niaz    schedule 15.04.2020
comment
@AlexOtt, я использую spark-cassandra-connector-2.0.3. Metadata_last_modified_source_time имеет тип sql.Timestamp. Ошибка указывает на тип bigInt. Как может быть ошибка правильная?   -  person Nasreen    schedule 15.04.2020
comment
попробуйте более новую версию - 2.4.3 и т. д.   -  person Alex Ott    schedule 15.04.2020


Ответы (1)


Я нашел решение после просмотра этого документа - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

Добавлено новое поле writeTime в классе case Offer, которое должно отображаться на метку времени записи таблицы cassandra.

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String, writeTime: sql.Date)

При создании offerDataSet я установил для поля writeTime значение

val offerDataset: Dataset[Offer] = {....
   ....
    val writeTime = new Date(metadata_last_modified_source_time.getTime())
   ....
   ....
}

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
person Nasreen    schedule 15.04.2020