Hi Fabian, Thanks for the reply. I shall try the CoProcessFunction implementation.
Currently, I am trying to assign watermark on the keyed stream. Please find a snippet of the code for better understanding; List < String > names = new ArrayList < > (); names.add("stream_a"); names.add("stream_b"); DataStream < String > messageStream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties)); KeyedStream < Tuple2 < String, JsonObject > , Tuple > pojo = messageStream.map(new JsonDeserializerv5()).keyBy(0); SingleOutputStreamOperator < Tuple2 < String, JsonObject >> watermarkStream = pojo.assignTimestampsAndWatermarks(new TimestampExtractorMergerv5()); DataStream < JsonObject > merge_stream = watermarkStream.keyBy(0).countWindow(2).apply(new JsonMergerv5()); The above snippet does a merge on the timestamp ( field (0) of the tuple ). But then, apply function is out of order , meaning even when the streams are joined at t2 which is less than watermark, they get processed by the apply function. Kindly let me know if I am not using the watermarking in a proper way or have misunderstood the usage of watermarks. Regards, Vijay Raajaa G S On Mon, Jul 31, 2017 at 2:09 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vijay, > > there are many ways to implement joins with a stateful CoProcessFunction. > It gives you access to the timestamps of records and you can register > timers that trigger when a certain time is reached. > It is basically up to you how you join and emit data. You can drop late > data or emit it. Please note that records are emitted either with their > current timestamp (if in processElement()) or with the timestamp of the > timer that fired (in onTimer()). > > Hope this helps, > Fabian > > > > 2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <gsvijayraa...@gmail.com>: > >> My bad. I meant only join. I am currently using keyBy on a timestamp >> common across the streams. >> >> Regards, >> Vijay Raajaa GS >> >> On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> @Wei: You can implement very different behavior using a >>> CoProcessFunction. However, if your operator is time-based, the logical >>> time of the operator will be the minimum time of both streams (time of the >>> "slower" watermark). >>> >>> @Vijay: I did not understand what your requirements are. Do you want to >>> join or merge streams? Those are two different things. This thread >>> discusses joins not merging. >>> >>> Best, >>> Fabian >>> >>> 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gsvijayraa...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> How do I order by the merge time. Let's say I merge the stream at T1. I >>>> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data >>>> from individual stream and the time at which the merge happens, they become >>>> out of order. Any thoughts will be really appreciated. >>>> >>>> Regards, >>>> Vijay Raajaa GS >>>> >>>> On Jul 31, 2017 1:14 AM, "wei" <jixia...@googlemail.com> wrote: >>>> >>>> Hello Fabian, >>>> >>>> >>>> >>>> thank you for your answer! >>>> >>>> >>>> >>>> Does it mean that the operator will wait until get two watermarks from >>>> the input streams and emits then the “slower” watermark? >>>> >>>> >>>> >>>> Best regards >>>> >>>> Wei >>>> >>>> >>>> >>>> *Von:* Fabian Hueske [mailto:fhue...@gmail.com] >>>> *Gesendet:* Sunday, July 30, 2017 11:17 AM >>>> *An:* xie wei >>>> *Cc:* user >>>> *Betreff:* Re: Is watermark used by joining two streams >>>> >>>> >>>> >>>> Periodic and punctuated watermarks only differ in the way that they are >>>> generated. Afterwards they are treated the same. >>>> >>>> An operator with two input streams will always sync its own watermarks >>>> to the watermarks of both input streams, i.e., to the "slower" watermark of >>>> both inputs. >>>> >>>> So if the left input says it is 12:14 and the right says it is 11:53, >>>> the operator will have a internal time of 11:53 and emit watermarks >>>> according to that time. >>>> >>>> Hope that helps, >>>> >>>> Fabian >>>> >>>> >>>> >>>> >>>> >>>> 2017-07-28 15:00 GMT+02:00 xie wei <jixia...@googlemail.com>: >>>> >>>> Hello, >>>> >>>> i want to join two streams based on event time window, every stream has >>>> its own watermark, one has priodic watermark and the other has punctuated >>>> watermark. >>>> >>>> are the watermarks used to trigger the join? if yes, which one and how >>>> is it used? >>>> >>>> Thank you and best regards >>>> >>>> Wei >>>> >>>> >>>> >>>> >>>> >>>> >>> >> >