Hi,

I think instead of generating DataStream[BillCount], the correct way is to
generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id,
value.getProvince_id, value.getCity_id, value.getOrder_require_varieties,
value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM laney0...@163.com <laney0...@163.com> wrote:

> Hi,
>      I‘m confused about a problem, occuring a exception "
> org.apache.flink.table.api.TableException: Table of atomic type can only have 
> a single field.
> "
>      Both *BillCount *and *Record *are class object*.*  Following is code.
>
>        case  class *BillCount*
> (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, 
> orderRecAmount: Double, orderRecDate: Long)
>        val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>   //source is FlinkKafkaConsumer010 source
>        val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>                   new MapFunction[Record, BillCount] {
>                     override def map(value: *Record*) = {
>                       *BillCount*
> (value.getLogis_id, value.getProvince_id, value.getCity_id,
>
> value.getOrder_require_varieties, value.getOrder_rec_amount, 
> value.getStore_rec_date.getTime)
>         }
>       })
>
>  val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 
> 'orderRequVari, 'orderRecAmount, 'orderRecDate)
>       // occur error here
>
>
>     Error :
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> of atomic type can only have a single field.
>
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>
> at 
> org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>
> at 
> org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>
> at 
> org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>
> at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>
> at 
> org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>   Thanks.
>
>
> ------------------------------
> laney0...@163.com
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>

Reply via email to