Таблица исключений атомарного типа может иметь только одно поле. при преобразовании DataStream в таблицу

1. BillCount и Record являются объектами класса. Столбцы объекта BillCount являются одними из столбцов Record.
2 、 Источник Flink получает данные 'Record' из темы kafka.

case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
   val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
              new MapFunction[Record, BillCount] {
                override def map(value: Record) = {
                  BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                          value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
    }
  })
 val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate) // occur error here

Следующее исключение:

Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)

person Li Peng    schedule 25.09.2017    source источник
comment
Можете ли вы опубликовать тип своего потока, который вы пытаетесь преобразовать? println (tbDataStream.dataType) Я думаю, проблема в том, что BillCount рассматривается как общий тип, потому что вы не соответствуете требованиям POJO.   -  person twalthr    schedule 25.09.2017
comment
Обычно журнал сообщает, почему BillCount не является POJO. Кстати. он должен быть статическим Java (определенным в сопутствующем объекте Scala).   -  person twalthr    schedule 25.09.2017
comment
Искренне благодарим за ответ. После воссоздания класса BillCount результатом println (tbDataStream.dataType) будет PojoType ‹org.apache.flink.app.BillCount, fields = [cityId: Integer, logisId: Integer, ...›, предыдущий журнал ошибок не возник. :) Спасибо еще раз.   -  person Li Peng    schedule 25.09.2017


Ответы (1)


BillCount должен быть классом POJO. Это означает, что класс BillCount должен иметь конструктор по умолчанию (без параметров) и функции получения / установки. опыт:

class BillCount{
    private int logisId;
    private int provinceId;
    private int cityId;

    BillCount(){}

    public void setLogisId(int logisId){
        this.logisId = logisId;
    }

    public void setProvinceId(int provinceId){
        this.provinceId = provinceId;
    }

    public void setCityId(int cityId){
        this.cityId = cityId;
    }

    public int getLogisId(){
        return this.logisId;
    }

    public int getProvinceId(){
        return this.provinceId;
    }

    public int getCityId() {
        return this.cityId;
    }

}
person pijing    schedule 23.10.2018
comment
Это связано с этим тикетом Flink Jira issues.apache.org/jira/browse/FLINK-10264 - person wind; 03.12.2019