My final calculation result is implemented in the following way when writing to kafka, because KafkaTableSink does not support retract mode, I am not sure whether this method will affect the calculation result.
val userTest: Table = tEnv.sqlQuery(sql) val endStream = tEnv.toRetractStream[Row](userTest) //userTest.insertInto("kafkaSink") val myProducer = new FlinkKafkaProducer011[String]( kafkaBrokers, // broker list topic, // target topic new SimpleStringSchema) // serialization schema endStream.map(x=>{ s"${x._1}:${x._2.toString}" }).addSink(myProducer) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/