ОБНОВЛЕНИЕ таблицы Cassandra с использованием разъема Spark Cassandra

У меня проблема с соединителем Spark Cassandra на scala при обновлении таблицы в моем пространстве ключей

Вот мой фрагмент кода

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

Когда я выполняю этот код, я получаю такую ​​ошибку

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

Есть идеи, почему это происходит? Как я могу это исправить?


person Sunil Kumar B M    schedule 05.08.2015    source источник
comment
Каков результат, если вы запустите оператор SQL, сгенерированный вашим кодом, непосредственно на Cassandra?   -  person ofirski    schedule 06.08.2015
comment
@kerkero: если я запустил его на cassandra, он либо обновит строку, если ключ уже присутствует, либо создаст новую строку для этого ключа, если ключ отсутствует   -  person Sunil Kumar B M    schedule 06.08.2015
comment
Вы определили столбец, который соответствует a в вашем примере, как тип счетчика?   -  person ofirski    schedule 06.08.2015
comment
Если определено, BTW ... это не счетчик, это набор   -  person Sunil Kumar B M    schedule 06.08.2015
comment
Привет, @SunilKumarBM, с возможно предвзятой точки зрения, я бы рекомендовал использовать фантом для обычного приложения Cassandra, разъем Spark специально предназначен для приложений Spark, тогда как фантом должен быть основой любого API на основе Cassandra.   -  person flavian    schedule 09.09.2015


Ответы (2)


ОБНОВЛЕНИЕ таблицы с столбцом счетчика возможно через свечка-кассандра-коннектор. Вам нужно будет использовать DataFrames и DataFrameWriter метод сохранения в режиме "добавить" (или SaveMode. Добавьте, если хотите). Проверьте код DataFrameWriter.scala.

Например, учитывая таблицу:

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

Код должен выглядеть так:

val updateRdd = sc.parallelize(Seq(Row("John",    "Smith", 1L),
                                   Row("Zhang",   "Wei",   2L),
                                   Row("Angelos", "Papas", 3L)))

val tblStruct = new StructType(
    Array(StructField("name",    StringType, nullable = false),
          StructField("surname", StringType, nullable = false),
          StructField("count",   LongType,   nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd, tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()

После ОБНОВЛЕНИЯ:

 name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

Преобразование DataFrame можно упростить, если неявно преобразовать RDD в DataFrame: import sqlContext.implicits._ и используя .toDF().

Проверьте полный код этого игрушечного приложения: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

Поскольку версии здесь очень важны, приведенное выше относится к Scala 2.11.7, Spark 1.5.1, spark-cassandra-connector 1.5.0-RC1-s_2.11, Cassandra 3.0.5. DataFrameWriter обозначается как @Experimental с @since 1.4.0.

person Kyr    schedule 21.04.2016
comment
как я могу вставить новую запись или удалить с помощью фрейма данных? - person H Raval; 14.02.2017

Я считаю, что родным образом обновить через коннектор SPARK нельзя. См. документацию:

«По умолчанию Spark Cassandra Connector перезаписывает коллекции при вставке в таблицу cassandra. Чтобы переопределить это поведение, вы можете указать настраиваемый сопоставитель с инструкциями о том, как вы хотите, чтобы коллекция обрабатывалась».

Таким образом, вы действительно захотите ВСТАВИТЬ новую запись с существующим ключом.

person Bacon    schedule 06.08.2015