Hi Shubham,

great that tweaking the JDBC sink helped. Maybe I don't fully understand your logic but:

The watermark that you are receiving in an operator should already be the minimum of all subtasks. Because it is sent to all subsequent operators by the precedeing operator. So a watermark can trigger a flush. I guess you need a measure of completeness when the flush() has happended in all subtasks, right?

Couldn't you wrap a sink function into a process function and use the output of the process function for performing the aggregation of watermark timestamps in Flink. A leaf operator (with parallelism 1) would then perform the aggregation and the update of the overall event-time timestamp.

I hope this helps.

Regards,
Timo


On 04.05.20 03:04, Shubham Kumar wrote:
Following up on this,

I tried tweaking the Jdbc Sink as Timo suggested and was successful in it. Basically I added a member /long maxWatermarkSeen /in JDBCOutputFormat, so whenever a new record is added to the batch it updates the /maxWatermarkSeen/ for this subtask with /org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark/ (if its greater).  So whenever a /JDBCOutputFormat.flush()/ is called I can be sure that after executing batch, all records having timestamp below /maxWatermarkSeen/ are pushed to JDBC.

Now, the actual answer I am looking for is minimum of /maxWatermarkSeen/ for all subtasks. I can constantly update this to DB as </Subtask Index, Watermark/> and take minimum in DB.
  I guess the aggregation can't be done inside flink amongst subtasks?

Now, I have two questions:

1) Should I update this to DB using async I/O feature of flink or just perform a blocking query in /JDBCOutputFormat.flush()/ function after executing the batch. 2) If I will be using Kafka sink (or any other sink for that matter), do I have to again tweak around with its SinkFunction for this functionality?    General idea being that this a common functionality for users to know till what timestamp is sink complete and can have simpler solutions.

Thanks
Shubham

On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar <shubhamkumar1...@gmail.com <mailto:shubhamkumar1...@gmail.com>> wrote:

    Hi Timo,

    Yeah, I got the idea of getting access to timers through process
    function and had the same result which you explained
    that is a side output doesn't guarantee that the data is written out
    to sink. (so maybe Fabian in that post pointed out something
    else which I am missing). If I am correct then, data is written to
    side output as soon as it is processed in the Process function
    (maybe in
    process function itself on Ontimer call if a timer has been set, right?

    I am doing all computation in Datastream<Object> and then adding a
    mapper to convert to DataStream<Row> to sink through JdbcAppendTableSink
    which is part of Table API I think. I will definitely try exploring
    the Jdbc Sink function and context to get the watermark.

    Thinking out of the box, is it possible to add some extra operator
    after sink which will always have watermark which is greater than
    sink function watermarks,
    as its a downstream operator.
    Also, does the problem simplify if we have Kafka sink?

    On Tue, Apr 28, 2020 at 10:35 PM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>> wrote:

        Hi Shubham,

        you can call stream.process(...). The context of ProcessFunction
        gives
        you access to TimerService which let's you access the current
        watermark.

        I'm assuming your are using the Table API? As far as I remember,
        watermark are travelling through the stream even if there is no
        time-based operation happening. But you should double check that.

        However, a side output does not guarantee that the data has
        already been
        written out to the sink. So I would recommend to customize the
        JDBC sink
        instead and look into the row column for getting the current
        timestamp.

        Or even better, there should also be
        org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
        with
        access to watermark.

        I hope this helps.

        Regards,
        Timo

        On 28.04.20 13:07, Shubham Kumar wrote:
         > Hi everyone,
         >
         > I have a flink application having kafka sources which
        calculates some
         > stats based on it and pushes it to JDBC. Now, I want to know
        till what
         > timestamp is the data completely pushed in JDBC (i.e. no more
        data will
         > be pushed to timestamp smaller or equal than this). There
        doesn't seem
         > to be any direct programmatic way to do so.
         >
         > I came across the following thread which seemed most relevant
        to my
         > problem:
         >
        
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
         >
         > However, I can't seem to understand how to chain a process
        function
         > before the sink task so as to put watermarks to a side
        output. (I
         > suspect it might have something to do with datastream.addSink
        in regular
         > datastream sinks vs sink.consumeDataStream(stream) in
        JDBCAppendTableSink).
         >
         > Further what happens if there are no windows, how to approach
        the
         > problem then?
         >
         > Please share any pointers or relevant solution to tackle this.
         >
         > --
         > Thanks & Regards
         >
         > Shubham Kumar
         >



-- Thanks & Regards

    Shubham Kumar



--
Thanks & Regards

Shubham Kumar


Reply via email to