Hi Shubham,
great that tweaking the JDBC sink helped. Maybe I don't fully understand
your logic but:
The watermark that you are receiving in an operator should already be
the minimum of all subtasks. Because it is sent to all subsequent
operators by the precedeing operator. So a watermark can trigger a
flush. I guess you need a measure of completeness when the flush() has
happended in all subtasks, right?
Couldn't you wrap a sink function into a process function and use the
output of the process function for performing the aggregation of
watermark timestamps in Flink. A leaf operator (with parallelism 1)
would then perform the aggregation and the update of the overall
event-time timestamp.
I hope this helps.
Regards,
Timo
On 04.05.20 03:04, Shubham Kumar wrote:
Following up on this,
I tried tweaking the Jdbc Sink as Timo suggested and was successful in
it. Basically I added a member /long maxWatermarkSeen /in JDBCOutputFormat,
so whenever a new record is added to the batch it updates the
/maxWatermarkSeen/ for this subtask with
/org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark/
(if its greater).
So whenever a /JDBCOutputFormat.flush()/ is called I can be sure that
after executing batch, all records having timestamp below
/maxWatermarkSeen/ are pushed to JDBC.
Now, the actual answer I am looking for is minimum of /maxWatermarkSeen/
for all subtasks. I can constantly update this to DB as </Subtask Index,
Watermark/> and take minimum in DB.
I guess the aggregation can't be done inside flink amongst subtasks?
Now, I have two questions:
1) Should I update this to DB using async I/O feature of flink or just
perform a blocking query in /JDBCOutputFormat.flush()/ function after
executing the batch.
2) If I will be using Kafka sink (or any other sink for that matter), do
I have to again tweak around with its SinkFunction for this functionality?
General idea being that this a common functionality for users to
know till what timestamp is sink complete and can have simpler solutions.
Thanks
Shubham
On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar
<shubhamkumar1...@gmail.com <mailto:shubhamkumar1...@gmail.com>> wrote:
Hi Timo,
Yeah, I got the idea of getting access to timers through process
function and had the same result which you explained
that is a side output doesn't guarantee that the data is written out
to sink. (so maybe Fabian in that post pointed out something
else which I am missing). If I am correct then, data is written to
side output as soon as it is processed in the Process function
(maybe in
process function itself on Ontimer call if a timer has been set, right?
I am doing all computation in Datastream<Object> and then adding a
mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink
which is part of Table API I think. I will definitely try exploring
the Jdbc Sink function and context to get the watermark.
Thinking out of the box, is it possible to add some extra operator
after sink which will always have watermark which is greater than
sink function watermarks,
as its a downstream operator.
Also, does the problem simplify if we have Kafka sink?
On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Shubham,
you can call stream.process(...). The context of ProcessFunction
gives
you access to TimerService which let's you access the current
watermark.
I'm assuming your are using the Table API? As far as I remember,
watermark are travelling through the stream even if there is no
time-based operation happening. But you should double check that.
However, a side output does not guarantee that the data has
already been
written out to the sink. So I would recommend to customize the
JDBC sink
instead and look into the row column for getting the current
timestamp.
Or even better, there should also be
org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
with
access to watermark.
I hope this helps.
Regards,
Timo
On 28.04.20 13:07, Shubham Kumar wrote:
> Hi everyone,
>
> I have a flink application having kafka sources which
calculates some
> stats based on it and pushes it to JDBC. Now, I want to know
till what
> timestamp is the data completely pushed in JDBC (i.e. no more
data will
> be pushed to timestamp smaller or equal than this). There
doesn't seem
> to be any direct programmatic way to do so.
>
> I came across the following thread which seemed most relevant
to my
> problem:
>
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
>
> However, I can't seem to understand how to chain a process
function
> before the sink task so as to put watermarks to a side
output. (I
> suspect it might have something to do with datastream.addSink
in regular
> datastream sinks vs sink.consumeDataStream(stream) in
JDBCAppendTableSink).
>
> Further what happens if there are no windows, how to approach
the
> problem then?
>
> Please share any pointers or relevant solution to tackle this.
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
--
Thanks & Regards
Shubham Kumar
--
Thanks & Regards
Shubham Kumar