Как передавать данные из темы Kafka в дельта-таблицу с помощью Spark Structured Streaming

Я пытаюсь понять дельту данных и думаю сделать POC с помощью Kafka. В основном план состоит в том, чтобы использовать данные из Kafka и вставить их в дельта-таблицу databricks.

Вот шаги, которые я сделал:

  1. Создайте дельта-таблицу для блоков данных.
%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
  1. Получение данных из Kafka.
import org.apache.spark.sql.types._
    
val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"
    
val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 100)
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .table("hazriq_delta_trial2")

Однако когда я запрашиваю таблицу, она пуста.

Могу подтвердить, что данные поступают. Я проверяю это, наблюдая всплеск на графике, когда я создаю сообщение в теме Kafka.

входящие данные

Я что-то упускаю?

Мне нужна помощь в том, как вставить данные, полученные от Kafka, в таблицу.


person Hazriq Ishak    schedule 28.02.2019    source источник
comment
Просто запустите свой код перед отправкой данных в kafka, после запуска кода вам нужно отправить данные в kafka.   -  person Vijay Kumar Sharma    schedule 28.02.2019
comment
Можете ли вы попробовать вставить непосредственно в каталог таблиц Delta, а не в таблицу? Например. kafka .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path   -  person J Smith    schedule 14.05.2019


Ответы (2)


Ниже приведен рабочий пример того, как читать данные из Kafka и передавать их в дельта-таблицу. Я использовал Spark 3.0.1 и delta-core 0.7.0 (если у вас версия Spark 2.4, вам нужно использовать 0.6.0).

Потоковая передача данных из Kafka в таблицу Delta

val spark = SparkSession.builder()
  .appName("Kafka2Delta")
  .master("local[*]")
  .getOrCreate()

// in production this should be a more reliable location such as HDFS
val deltaPath = "file:///tmp/delta/table"

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("CAST(value AS STRING) as value")

val query: StreamingQuery = df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .start(deltaPath)

query.awaitTermination()

Для тестирования я просто произвел символы a, b, c и d в качестве значений в теме Kafka. Очевидно, вы можете создать более сложные фреймы данных, если входные данные Kafka, например, строка JSON.

Проверка данных в дельта-таблице

val table = spark.read
  .format("delta")
  .load(deltaPath)
  .createOrReplaceTempView("testTable")

spark.sql("SELECT * FROM testTable").show(false)

// result
+-----+
|value|
+-----+
|a    |
|b    |
|c    |
|d    |
+-----+

Файлы, созданные в deltaPath

>/tmp/delta/table$ ll
total 44
drwxrwxr-x 3 x x 4096 Jan 11 17:12 ./
drwxrwxr-x 3 x x 4096 Jan 11 17:10 ../
drwxrwxr-x 2 x x 4096 Jan 11 17:12 _delta_log/
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  306 Jan 11 17:12 part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  407 Jan 11 17:12 part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet.crc


person mike    schedule 11.01.2021

1) Попробуйте проверить, есть ли у вас доступ к Kafka из кластера Spark, иногда вам нужно разрешить доступ с некоторых IP-адресов в Kafka.

2) Попробуйте поменять это .option("startingOffsets", "earliest") на это .option("startingOffsets", "latest")

3) Попробуйте также

val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .start("hazriq_delta_trial2")
person Eric Bellet    schedule 17.05.2019