Hi, I think instead of generating DataStream[BillCount], the correct way is to generate DataStream[Row], that is,
kafkaInputStream.map(value -> Row.of(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime) That should work. Regards, Haohui On Sun, Sep 24, 2017 at 6:40 PM laney0...@163.com <laney0...@163.com> wrote: > Hi, > I‘m confused about a problem, occuring a exception " > org.apache.flink.table.api.TableException: Table of atomic type can only have > a single field. > " > Both *BillCount *and *Record *are class object*.* Following is code. > > case class *BillCount* > (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, > orderRecAmount: Double, orderRecDate: Long) > val kafkaInputStream: DataStream[*Record*] = env.addSource(source) > //source is FlinkKafkaConsumer010 source > val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map( > new MapFunction[Record, BillCount] { > override def map(value: *Record*) = { > *BillCount* > (value.getLogis_id, value.getProvince_id, value.getCity_id, > > value.getOrder_require_varieties, value.getOrder_rec_amount, > value.getStore_rec_date.getTime) > } > }) > > val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, > 'orderRequVari, 'orderRecAmount, 'orderRecDate) > // occur error here > > > Error : > > Exception in thread "main" org.apache.flink.table.api.TableException: Table > of atomic type can only have a single field. > > at > org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627) > > at > org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108) > > at > org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) > > at > org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) > > at > org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) > > at > org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58) > > > Thanks. > > > ------------------------------ > laney0...@163.com > > > 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>> > <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9> > >