Как изменить типы данных записей, вставляемых в Cassandra, с помощью потоковой передачи структуры Foreach Spark

Я пытаюсь вставить десериализованные записи Kafka в Data Stax Cassandra, используя потоковую передачу структуры Spark с использованием Foreach Sink.

Например, мои десериализованные данные фрейма данных, как и все, находятся в строковом формате.

id   name    date
100 'test' sysdate

Используя foreach Sink, я создал класс и попытался вставить записи, как показано ниже, преобразовав его.

session.execute(
  s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (id,name,date)
  values  ('${row.getAs[Long](0)}','${rowstring(1)}','${rowstring(2)}')"""))
  }
)

Я точно следил за этим проектом https://github.com/epishova/Structured-Streaming-Cassandra-Sink/blob/master/src/main/scala/cassandra_sink.scala

при вставке в таблицу Cassandra преобразование типа данных столбца строки "id" в Long, как упомянуто выше, не преобразуется. И выкидывает ошибку

"Недопустимая константа STRING (100) для id типа bigint"

ТАБЛИЦА КАССАНДРЫ; -

create table test(
id bigint,
name text,
date timestamp)

Любые предложения по преобразованию строкового типа данных в Long внутри def Process.

Также будет отличным альтернативное предложение.

Это код:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.expr

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output

  var cassandraDriver: CassandraDriver = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    if (cassandraDriver == null) {
      cassandraDriver = new CassandraDriver();
    }
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record.getLong(0)}', '${record(1)}', '${record(2)}')""")
    )
  }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")

    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

    spark
  }
}

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession

  import spark.implicits._

  val connector = CassandraConnector(spark.sparkContext.getConf)

  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       id Bigint,
       name text,
       timestamp_dt date,
       primary key (id));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession

    import spark.implicits._

    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"

    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"100": "test1", "01-mar-2018"}
    {"101": "test2", "02-mar-2018"}  */
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()

    val schema = StructType(
      Seq(
        StructField("id", StringType, false),
        StructField("name", StringType, false),
StructField("date", StringType, false)

      )
    )

    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))

    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")


    val sink = jsons
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

    sink.awaitTermination()
  }
}  

Мой модифицированный код; -

def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"in my Open connection")
    val cassandraDriver = new CassandraDriver();
    true
  }


  def process(record: Row) = {


    val optype = record(0)

    if (cassandraDriver == null) {
      val  cassandraDriver = new CassandraDriver();
    }

  if (optype == "I" || optype == "U") {

        println(s"Process insert or Update Idempotent new $record")

        cassandraDriver.connector.withSessionDo(session =>{
          val prepare_rating_brand = session.prepare(s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (table_name,op_type,op_ts,current_ts,pos,brand_id,brand_name,brand_creation_dt,brand_modification_dt,create_date) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""")

          session.execute(prepare_rating_brand.bind(record.getAs[String](0),record.getAs[String](1),record.getAs[String](2),record.getAs[String](3),record.getAs[String](4),record.getAs[BigInt](5),record.getAs[String](6),record.getAs[String](7),record.getAs[String](8),record.getAs[String](9))
          )

        })
      }  else if (optype == "D") {

        println(s"Process delete new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""DELETE FROM ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} WHERE brand_id = ${record.getAs[Long](5)}"""))

      } else if (optype == "T") {
        println(s"Process Truncate new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""Truncate table  ${cassandraDriver.namespace}.${cassandraDriver.plan_rating_archive_dub_sink}"""))

      }
    }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }


}


person venkat Ramanan VTR    schedule 12.07.2019    source источник
comment
Если вы используете Spark 2.4.x, вы можете использовать вместо него foreachBatch - это намного проще, чем foreach. Вот пример: github.com/alexott/spark-cassandra-oss-demos/blob/master/src/   -  person Alex Ott    schedule 22.07.2019
comment
Спасибо, @Alex Ott. Я использую Spark 2.2 с datastax 6.0.   -  person venkat Ramanan VTR    schedule 22.07.2019
comment
если вы используете дистрибутив DataStax, вы можете просто использовать writeStream вместо этого процессора: github.com/alexott/dse-java-playground/blob/master/src/main/ - в другом файле в та же папка, есть пример для Кафки   -  person Alex Ott    schedule 22.07.2019
comment
Хорошо, @AlexOtt, рассмотрим этот сценарий, у меня есть 10 запросов, записываемых в datastax, один запрос завершился искрой. У меня есть диспетчер потоковых запросов, контрольная точка и все, что нужно для обработки исключений. Вы подскажете, как перезапустить этот конкретный запрос, не затрагивая сеанс? так как у меня уже обработан queryawaittermination ().   -  person venkat Ramanan VTR    schedule 23.07.2019
comment
Хммм, я не совсем понимаю. Запрос обычно повторяется автоматически, если это возможно: docs.datastax.com /en/developer/java-driver/3.7/manual/retries, но вам может потребоваться специально пометить его как идемпотентный. Но, как я уже упоминал, если вы используете DataStax Enterprise - вам не нужно писать весь этот код самостоятельно - вам просто нужно использовать writeStream - тогда коннектор DSE будет обрабатывать все автоматически.   -  person Alex Ott    schedule 23.07.2019
comment
Я согласен с @Alex Ott. Но мне нужно вставить на основе значения типа операции столбца I или U или D. [если мне нужно вставить U, то обновите D, удалите строку]. Можно ли справиться с этим сценарием, Алекс?   -  person venkat Ramanan VTR    schedule 25.07.2019
comment
Также как преобразовать значение объекта record (0) в двойной формат Cassandra при вставке в Cassandra?   -  person venkat Ramanan VTR    schedule 25.07.2019


Ответы (1)


Ваша ошибка заключается в том, что вы указываете значение для поля id как '${row.getAs[Long](0)}' - вы добавили одинарные кавычки вокруг него, поэтому оно обрабатывается как строка, а не как _3 _ / _ 4_ - просто удалите одинарные кавычки вокруг этого значения: _5 _...

Кроме того, по соображениям производительности лучше переместить экземпляр драйвера cassandra в метод open и использовать подготовленные операторы, примерно так:

  var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (fx_marker, timestamp_ms, timestamp_dt) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getLong(0)}, 
           ${record(1)}, ${record(2)}))
    )
  }

он будет более производительным, и вам не нужно будет выполнять цитирование значений самостоятельно.

person Alex Ott    schedule 12.07.2019
comment
Привет, Алекс, еще одно сомнение. Я получаю все данные из Golden Gate в Kafka с еще одним параметром op_type = I (Insert) или U (Update). Поэтому я буду потреблять данные [вставлять или обновлять] на основе того, что мне нужно вставить в Cassandra. Поэтому я должен обрабатывать подготовленные операторы внутри [def Process]. Это возможно - person venkat Ramanan VTR; 16.07.2019
comment
так что просто подготовьте 2 разных запроса и выберите их в зависимости от флага. - person Alex Ott; 16.07.2019
comment
Спасибо @Alex Ott. datastax- oss.atlassian.net/browse/. Я читал это из Data stax. Итак, foreach внутренне готовит заявления? - person venkat Ramanan VTR; 16.07.2019
comment
Ах да - забыл об этом ... Вы можете пометить это поле как @transient - см. waitforcode.com/apache-spark/serialization-issues-part-2/ - person Alex Ott; 16.07.2019
comment
Привет, Алекс! Еще одно сомнение. Когда я пытаюсь обновить столбец, у него есть String, например Bravo Daytime-D (MF 8 AM-15 PM), и я получаю сообщение об ошибке, например com.datastax.driver.core.exceptions.SyntaxError : line 60:24 нет альтернативы при вводе 'Bravo' (... = 63904, sales_unit_name = '' [Bravo] ...) Как использовать escape-символы динамически. ? @Alex Ott - person venkat Ramanan VTR; 18.07.2019
comment
когда вы используете подготовленные операторы, вам не нужно самостоятельно экранировать данные - это одно из больших преимуществ помимо более низкой нагрузки на узлы. Подробнее см. В документации: docs.datastax.com/ ru / developer / java-driver / 3.7 / manual / - person Alex Ott; 18.07.2019
comment
о, ладно, @Alex Ott. Но мне нужно вставить на основе значения столбца op_type (I, D или U). Итак, в def open я не могу подготовить stackoverflow.com/posts/56883558/revisions. Поскольку я новичок в Кассандре, пожалуйста, посоветуйте мне. - person venkat Ramanan VTR; 18.07.2019
comment
Просто создайте N подготовленных операторов - разных для каждого типа операции. Другой вариант - использовать SimoleStatement, но он не так эффективен, как подготовленные операторы. - person Alex Ott; 18.07.2019
comment
хорошо, @Alex Ott. Я попытался использовать код, которым вы поделились выше. Но когда я применяю это в процессе сеанса. выполнить, он показывает Не удается разрешить перегруженный метод «выполнить». Какие-либо предложения - person venkat Ramanan VTR; 18.07.2019
comment
Я все еще вижу старый код с одинарными кавычками вокруг первого числа - просто удалите их - person Alex Ott; 19.07.2019
comment
спасибо за ваше руководство. Я пробовал, как указано выше, измененный код и работал нормально. Но я могу преобразовать тип данных в String или BigInt. Double не работает и показывает некоторую неявную ошибку преобразования !!! есть идеи по этому поводу.?. Еще раз благодарю вас. Кроме того, посоветуйте мне мою стратегию, это хорошо? - person venkat Ramanan VTR; 19.07.2019
comment
Вы должны подготовить каждый отчет только один раз - либо в open, либо по запросу. Используя подготовку для каждой записи, вы действительно снижаете производительность вашей системы, потому что prepare требует дополнительного обхода к серверу. Вы можете изучить этот код как пример подготовки операторов по запросу: github.com/alexott/spark-cassandra-oss-demos/blob/master/src/ - person Alex Ott; 19.07.2019