Hi clay,

Are there any other lines after the last line in your picture? The final
result should be eventual consistency and correct.

In your sql, there is a left join, a keyed group by and a non-keyed group
by. Both of the left join and keyed group by will send retractions to the
downstream non-keyed group by once there is an update. The retraction
messages vibrate the result value. However, the final result will be
correct.
To get monotonous results, you can add another non-keyed group by with max.

Best, Hequn.


On Sat, Sep 29, 2018 at 3:47 PM clay4444 <clay4me...@gmail.com> wrote:

> My final calculation result is implemented in the following way when
> writing
> to kafka, because KafkaTableSink does not support retract mode, I am not
> sure whether this method will affect the calculation result.
>
> val userTest: Table = tEnv.sqlQuery(sql)
>
> val endStream = tEnv.toRetractStream[Row](userTest)
>
> //userTest.insertInto("kafkaSink")
>
> val myProducer = new FlinkKafkaProducer011[String](
>   kafkaBrokers,         // broker list
>   topic,   // target topic
>   new SimpleStringSchema)   // serialization schema
>
> endStream.map(x=>{
>   s"${x._1}:${x._2.toString}"
> }).addSink(myProducer)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to