My final calculation result is implemented in the following way when writing
to kafka, because KafkaTableSink does not support retract mode, I am not
sure whether this method will affect the calculation result.

val userTest: Table = tEnv.sqlQuery(sql)

val endStream = tEnv.toRetractStream[Row](userTest)

//userTest.insertInto("kafkaSink")

val myProducer = new FlinkKafkaProducer011[String](
  kafkaBrokers,         // broker list
  topic,   // target topic
  new SimpleStringSchema)   // serialization schema

endStream.map(x=>{
  s"${x._1}:${x._2.toString}"
}).addSink(myProducer)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to