Hi,
you also need to keep the parallelism in mind. If your downstream
operator or sink has a parallelism of 1 and your SQL query pipeline has
a higher parallelism, the retract results are rebalanced and arrive in a
wrong order. For example, if you view the changelog in SQL Client, the
built-in SQL Client sink has always parallelism 1.
Regards,
Timo
Am 29.09.18 um 17:02 schrieb Hequn Cheng:
Hi clay,
Are there any other lines after the last line in your picture? The
final result should be eventual consistency and correct.
In your sql, there is a left join, a keyed group by and a non-keyed
group by. Both of the left join and keyed group by will send
retractions to the downstream non-keyed group by once there is an
update. The retraction messages vibrate the result value. However, the
final result will be correct.
To get monotonous results, you can add another non-keyed group by with
max.
Best, Hequn.
On Sat, Sep 29, 2018 at 3:47 PM clay4444 <clay4me...@gmail.com
<mailto:clay4me...@gmail.com>> wrote:
My final calculation result is implemented in the following way
when writing
to kafka, because KafkaTableSink does not support retract mode, I
am not
sure whether this method will affect the calculation result.
val userTest: Table = tEnv.sqlQuery(sql)
val endStream = tEnv.toRetractStream[Row](userTest)
//userTest.insertInto("kafkaSink")
val myProducer = new FlinkKafkaProducer011[String](
kafkaBrokers, // broker list
topic, // target topic
new SimpleStringSchema) // serialization schema
endStream.map(x=>{
s"${x._1}:${x._2.toString}"
}).addSink(myProducer)
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/