Hi clay,

Are there any other lines after the last line in your picture? The final
result should be eventual consistency and correct.

In your sql, there is a left join, a keyed group by and a non-keyed group
by. Both of the left join and keyed group by will send retractions to the
downstream non-keyed group by once there is an update. The retraction
messages vibrate the result value. However, the final result will be
To get monotonous results, you can add another non-keyed group by with max.

Best, Hequn.

On Sat, Sep 29, 2018 at 3:47 PM clay4444 <clay4me...@gmail.com> wrote:

> My final calculation result is implemented in the following way when
> writing
> to kafka, because KafkaTableSink does not support retract mode, I am not
> sure whether this method will affect the calculation result.
> val userTest: Table = tEnv.sqlQuery(sql)
> val endStream = tEnv.toRetractStream[Row](userTest)
> //userTest.insertInto("kafkaSink")
> val myProducer = new FlinkKafkaProducer011[String](
>   kafkaBrokers,         // broker list
>   topic,   // target topic
>   new SimpleStringSchema)   // serialization schema
> endStream.map(x=>{
>   s"${x._1}:${x._2.toString}"
> }).addSink(myProducer)
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to