Synchronization across tasks using checkpoint barriers

2022-02-13 Thread Gopi Krishna M
Hi,
In my flink operators, I need to connect to an external service to update
state. I was thinking that the updates to the external service can be
synchronized via checkpoint barriers.

The topology of the stream is a source, then a single stage of operator
replicas handling different partitions, then all joining in a single sink.

Each operator will contact the external service when it receives a
checkpoint barrier and uploads local state (which caches the uploads and
returns a handle).

After upload, it forwards the cache handle to the sink. Once sink receives
handles from all such operators, it calls the external service with a list
of handles received. This helps ensure that all handles are from the same
checkpoint barrier.

Is it possible to achieve this in a flink application?

Thanks,
Gopi


Joining Flink tables with different watermark delay

2022-02-13 Thread Meghajit Mazumdar
Hello,

We are creating two data streams in our Flink application. Both of them are
then formed into two Tables. The first data stream has a watermark delay of
24 hours while the second stream has a watermark delay of 60 minutes. The
watermark used is of BoundedOutOfOrderness strategy and uses a particular
event_time field present within the the records themselves to assign
watermarks.

For example,

DataStream fileStream = env.fromSource(
fileSource,
getWatermarkStrategy(8640), // custom function,
watermark of 24 hours in ms
"fileSource");
Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
tableEnv.createTemporaryView("fileTable", firstTable);

DataStream kafkaStream = env.fromSource(
kafkaSource,
getWatermarkStrategy(360), // custom function,
watermark
of 60 minutes in ms
"kafkaSource");
Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
tableEnv.createTemporaryView("kafkaTable", secondTable);

Now we want to write a continuous SQL query to join

 firstTable and secondTable with a TumbleWindow of 60 minutes

"SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
event_time,
MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
"FROM fileTable, kafkaTable " +
"where fileTable.id = kafkaTable.id " +
"group by TUMBLE(fileTable.rowtime, INTERVAL '60'
MINUTE)"

What we want to know is, will a join or aggregation queries work correctly
between the two tables.  Is it the case that the contents of  kafkaTable
will be purged immediately after 60 minutes and hence a join/aggregation
might not give correct results ?
Will there be a data loss if tables with different watermark delays are
joined ?

-- 
*Regards,*
*Meghajit*