Following up on this, I tried tweaking the Jdbc Sink as Timo suggested and was successful in it. Basically I added a member *long maxWatermarkSeen *in JDBCOutputFormat, so whenever a new record is added to the batch it updates the *maxWatermarkSeen* for this subtask with *org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark* (if its greater). So whenever a *JDBCOutputFormat.flush()* is called I can be sure that after executing batch, all records having timestamp below *maxWatermarkSeen* are pushed to JDBC.
Now, the actual answer I am looking for is minimum of *maxWatermarkSeen* for all subtasks. I can constantly update this to DB as <*Subtask Index, Watermark*> and take minimum in DB. I guess the aggregation can't be done inside flink amongst subtasks? Now, I have two questions: 1) Should I update this to DB using async I/O feature of flink or just perform a blocking query in *JDBCOutputFormat.flush()* function after executing the batch. 2) If I will be using Kafka sink (or any other sink for that matter), do I have to again tweak around with its SinkFunction for this functionality? General idea being that this a common functionality for users to know till what timestamp is sink complete and can have simpler solutions. Thanks Shubham On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar <shubhamkumar1...@gmail.com> wrote: > Hi Timo, > > Yeah, I got the idea of getting access to timers through process function > and had the same result which you explained > that is a side output doesn't guarantee that the data is written out to > sink. (so maybe Fabian in that post pointed out something > else which I am missing). If I am correct then, data is written to side > output as soon as it is processed in the Process function (maybe in > process function itself on Ontimer call if a timer has been set, right? > > I am doing all computation in Datastream<Object> and then adding a mapper > to convert to DataStream<Row> to sink through JdbcAppendTableSink > which is part of Table API I think. I will definitely try exploring the > Jdbc Sink function and context to get the watermark. > > Thinking out of the box, is it possible to add some extra operator after > sink which will always have watermark which is greater than sink function > watermarks, > as its a downstream operator. > Also, does the problem simplify if we have Kafka sink? > > On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Shubham, >> >> you can call stream.process(...). The context of ProcessFunction gives >> you access to TimerService which let's you access the current watermark. >> >> I'm assuming your are using the Table API? As far as I remember, >> watermark are travelling through the stream even if there is no >> time-based operation happening. But you should double check that. >> >> However, a side output does not guarantee that the data has already been >> written out to the sink. So I would recommend to customize the JDBC sink >> instead and look into the row column for getting the current timestamp. >> >> Or even better, there should also be >> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with >> access to watermark. >> >> I hope this helps. >> >> Regards, >> Timo >> >> On 28.04.20 13:07, Shubham Kumar wrote: >> > Hi everyone, >> > >> > I have a flink application having kafka sources which calculates some >> > stats based on it and pushes it to JDBC. Now, I want to know till what >> > timestamp is the data completely pushed in JDBC (i.e. no more data will >> > be pushed to timestamp smaller or equal than this). There doesn't seem >> > to be any direct programmatic way to do so. >> > >> > I came across the following thread which seemed most relevant to my >> > problem: >> > >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461 >> > >> > However, I can't seem to understand how to chain a process function >> > before the sink task so as to put watermarks to a side output. (I >> > suspect it might have something to do with datastream.addSink in >> regular >> > datastream sinks vs sink.consumeDataStream(stream) in >> JDBCAppendTableSink). >> > >> > Further what happens if there are no windows, how to approach the >> > problem then? >> > >> > Please share any pointers or relevant solution to tackle this. >> > >> > -- >> > Thanks & Regards >> > >> > Shubham Kumar >> > >> >> > > -- > Thanks & Regards > > Shubham Kumar > > -- Thanks & Regards Shubham Kumar