Hi, I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field." Both BillCount and Record are class object. Following is code. case class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long) val kafkaInputStream: DataStream[Record] = env.addSource(source) //source is FlinkKafkaConsumer010 source val tbDataStream : DataStream[BillCount] = kafkaInputStream.map( new MapFunction[Record, BillCount] { override def map(value: Record) = { BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime) } }) val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate) // occur error here
Error : Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field. at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627) at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108) at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58) Thanks. laney0...@163.com