Following up on this,

I tried tweaking the Jdbc Sink as Timo suggested and was successful in it.
Basically I added a member *long maxWatermarkSeen *in JDBCOutputFormat,
so whenever a new record is added to the batch it updates the
*maxWatermarkSeen* for this subtask with
*org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark*
(if its greater).
 So whenever a *JDBCOutputFormat.flush()* is called I can be sure that
after executing batch, all records having timestamp below *maxWatermarkSeen*
are pushed to JDBC.

Now, the actual answer I am looking for is minimum of *maxWatermarkSeen*
for all subtasks. I can constantly update this to DB as <*Subtask Index,
Watermark*> and take minimum in DB.
 I guess the aggregation can't be done inside flink amongst subtasks?

Now, I have two questions:

1) Should I update this to DB using async I/O feature of flink or just
perform a blocking query in *JDBCOutputFormat.flush()* function after
executing the batch.
2) If I will be using Kafka sink (or any other sink for that matter), do I
have to again tweak around with its SinkFunction for this functionality?
   General idea being that this a common functionality for users to know
till what timestamp is sink complete and can have simpler solutions.

Thanks
Shubham

On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar <shubhamkumar1...@gmail.com>
wrote:

> Hi Timo,
>
> Yeah, I got the idea of getting access to timers through process function
> and had the same result which you explained
> that is a side output doesn't guarantee that the data is written out to
> sink. (so maybe Fabian in that post pointed out something
> else which I am missing). If I am correct then, data is written to side
> output as soon as it is processed in the Process function (maybe in
> process function itself on Ontimer call if a timer has been set, right?
>
> I am doing all computation in Datastream<Object> and then adding a mapper
> to convert to DataStream<Row> to sink through JdbcAppendTableSink
> which is part of Table API I think. I will definitely try exploring the
> Jdbc Sink function and context to get the watermark.
>
> Thinking out of the box, is it possible to add some extra operator after
> sink which will always have watermark which is greater than sink function
> watermarks,
> as its a downstream operator.
> Also, does the problem simplify if we have Kafka sink?
>
> On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Shubham,
>>
>> you can call stream.process(...). The context of ProcessFunction gives
>> you access to TimerService which let's you access the current watermark.
>>
>> I'm assuming your are using the Table API? As far as I remember,
>> watermark are travelling through the stream even if there is no
>> time-based operation happening. But you should double check that.
>>
>> However, a side output does not guarantee that the data has already been
>> written out to the sink. So I would recommend to customize the JDBC sink
>> instead and look into the row column for getting the current timestamp.
>>
>> Or even better, there should also be
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with
>> access to watermark.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>> On 28.04.20 13:07, Shubham Kumar wrote:
>> > Hi everyone,
>> >
>> > I have a flink application having kafka sources which calculates some
>> > stats based on it and pushes it to JDBC. Now, I want to know till what
>> > timestamp is the data completely pushed in JDBC (i.e. no more data will
>> > be pushed to timestamp smaller or equal than this). There doesn't seem
>> > to be any direct programmatic way to do so.
>> >
>> > I came across the following thread which seemed most relevant to my
>> > problem:
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
>> >
>> > However, I can't seem to understand how to chain a process function
>> > before the sink task so as to put watermarks to a side output. (I
>> > suspect it might have something to do with datastream.addSink in
>> regular
>> > datastream sinks vs sink.consumeDataStream(stream) in
>> JDBCAppendTableSink).
>> >
>> > Further what happens if there are no windows, how to approach the
>> > problem then?
>> >
>> > Please share any pointers or relevant solution to tackle this.
>> >
>> > --
>> > Thanks & Regards
>> >
>> > Shubham Kumar
>> >
>>
>>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

-- 
Thanks & Regards

Shubham Kumar

Reply via email to