Запрос Delta Lake изнутри UDF в Databricks

Необходимо выполнить несколько запросов к таблице внутри UDF в структурированной потоковой передаче. Проблема в том, что внутри UDF, если я пытаюсь использовать spark.sql, я получаю исключение нулевого указателя. Как лучше всего следовать здесь?

В основном мне нужно передавать поток из таблицы, а затем использовать эти данные для выполнения некоторых запросов диапазона из другой таблицы.

Eg.

val appleFilter = udf((appleId : String) => {
     val query = "select count(*) from appleMart where appleId='"+appleId+"'"
     val appleCount = spark.sql(query).collect().head.getLong(0)
     (appleCount>0)
})

val newApple = apples.filter(appleFilter($"appleId"))



Ответы (1)


Это не совсем правильный подход для этой задачи - вам не следует выполнять отдельные запросы из UDF, так как Spark не сможет распараллелить / оптимизировать их.

Лучше всего будет просто выполнить соединение между фреймом данных потоковой передачи и appleMart фрейм данных - это позволит Spark оптимизировать все операции. Как я понял из вашего кода, вам просто нужно проверить, есть ли у вас яблоки с данным идентификатором. В этом случае вы можете просто выполнить внутреннее соединение - это оставит только те идентификаторы, для которых есть строки в appleMart, примерно так:

val appleMart = spark.read.format("delta").load("path_to_delta")
val newApple = apples.join(appleMart, apples("appleId") === appleMart("appleId"))

если по какой-то причине вам нужно оставить apples записей, которых нет в appleMart, вы можете вместо этого использовать left join ...

P.S. Если appleMart не меняется очень часто, его можно кэшировать. Хотя для потоковых заданий для таблиц поиска что-то вроде Cassandra может быть лучше с точки зрения производительности.

person Alex Ott    schedule 30.07.2020