Определенная с помощью использования агрегатная функция во Flink - не найдено совпадений для сигнатуры функции

Я хотел бы сохранить все необработанные строки для каждого ключа в запросе Select .. From .. GROUP BY .. во Flink. Я определил AggregateFunction под названием RowToJsonAgg, который объединяет строки в строку Json.

class RowToJsonAgg extends AggregateFunction[String, ListBuffer[String]]{
  def accumulate(accumulator: ListBuffer[String], row: Any*): Unit = {
   ....

// предполагаем, что строка выглядит как $ field1_name, $ field1_value, $ field2_name, $ field2_value, ... // пытаемся сгенерировать json из строки. однако кажется, что Flink не может найти эту функцию, когда я выполнял запрос}

  def merge(accumulator: ListBuffer[String], its: java.lang.Iterable[ListBuffer[String]]): Unit = {
    accumulator.append(
      WrapAsScala.iterableAsScalaIterable(its).flatten.toList:_*
    )
  }

  def resetAccumulator(accumulator: ListBuffer[String]): Unit = {
    accumulator.clear()
  }

  override def getValue(accumulator: ListBuffer[String]): String = {
    accumulator.mkString("{", ",", "}")
  }

  override def createAccumulator(): ListBuffer[String] = ListBuffer.empty

  override def getAccumulatorType(): TypeInformation[ListBuffer[String]] = {
    TypeInformation.of(classOf[ListBuffer[String]])
  }

  override def getResultType: TypeInformation[String] = TypeInformation.of(classOf[String])
}

Класс данных и запрос выглядят следующим образом:

case class Stock(id:Int, price: Int, volumn: Int, ts: Long)

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

val bbTableEnv = TableEnvironment.create(bbSettings)

bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])

val table = bbTableEnv.fromValues(...)

bbTableEnv.createTemporaryView("Stock", table)

bbTableEnv.executeSql(
    "select price, row_to_json_agg('volumn', volumn, 'ts', ts) as details from Stock group by price"
)

Когда я запустил приложение, я получил исключение проверки SQL, подробное сообщение: Не найдено совпадений для сигнатуры функции row_to_json_agg (CHARACTER, NUMERIC, CHARACTER, NUMERIC)

Кажется, Flink не может найти нужную функцию накопления для вызова.

Если я объявлю функцию накопления следующим образом

def accumulate(accumulator: ListBuffer[String], volumn: Integer, ts: Long)

и изменил запрос вроде

"select price, row_to_json_agg(volumn, ts) from Stock group by price" 

У меня такое же исключение, и появляется сообщение Не найдено совпадений для сигнатуры функции row_to_json_agg (NUMERIC, NUMERIC)

Есть идеи, как заставить агрегатную функцию работать?


person Grant    schedule 19.01.2021    source источник


Ответы (1)


Я разобрался сам.

  1. зарегистрируйте UDF, запустив SQL следующим образом:

    bbTableEnv.executeSQL (String.format (создать временную функцию $ udf_name как '% s', $ full_class_name_of_your_udf))

вместо того

bbTableEnv.createTemporarySystemFunction("row_to_json_agg", classOf[RowToJsonAgg])
  1. предпочитаю использовать Java для реализации UDF вместо Scala
person Grant    schedule 20.01.2021