Я пытаюсь понять дельту данных и думаю сделать POC с помощью Kafka. В основном план состоит в том, чтобы использовать данные из Kafka и вставить их в дельта-таблицу databricks.
Вот шаги, которые я сделал:
- Создайте дельта-таблицу для блоков данных.
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
- Получение данных из Kafka.
import org.apache.spark.sql.types._
val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"
val kafka2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.load()
.select($"value")
.withColumn("Value", $"value".cast(StringType))
.writeStream
.option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
.table("hazriq_delta_trial2")
Однако когда я запрашиваю таблицу, она пуста.
Могу подтвердить, что данные поступают. Я проверяю это, наблюдая всплеск на графике, когда я создаю сообщение в теме Kafka.
Я что-то упускаю?
Мне нужна помощь в том, как вставить данные, полученные от Kafka, в таблицу.
kafka .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path
- person J Smith   schedule 14.05.2019