Hi Fabian,

Thanks for the reply. I shall try the CoProcessFunction implementation.

Currently, I am trying to assign watermark on the keyed stream. Please find
a snippet of the code for better understanding;

List < String > names = new ArrayList < > ();

names.add("stream_a");

names.add("stream_b");

DataStream < String > messageStream = env.addSource(new
FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

KeyedStream < Tuple2 < String, JsonObject > , Tuple > pojo =
messageStream.map(new JsonDeserializerv5()).keyBy(0);

SingleOutputStreamOperator < Tuple2 < String, JsonObject >> watermarkStream
= pojo.assignTimestampsAndWatermarks(new TimestampExtractorMergerv5());

DataStream < JsonObject > merge_stream =
watermarkStream.keyBy(0).countWindow(2).apply(new JsonMergerv5());

The above snippet does a merge on the timestamp ( field (0) of the tuple ).
But then, apply function is out of order , meaning even when the streams
are joined at t2 which is less than watermark, they get processed by the
apply function. Kindly let me know if I am not using the watermarking in a
proper way or have misunderstood the usage of watermarks.

Regards,

Vijay Raajaa G S

On Mon, Jul 31, 2017 at 2:09 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Vijay,
>
> there are many ways to implement joins with a stateful CoProcessFunction.
> It gives you access to the timestamps of records and you can register
> timers that trigger when a certain time is reached.
> It is basically up to you how you join and emit data. You can drop late
> data or emit it. Please note that records are emitted either with their
> current timestamp (if in processElement()) or with the timestamp of the
> timer that fired (in onTimer()).
>
> Hope this helps,
> Fabian
>
>
>
> 2017-07-31 9:48 GMT+02:00 G.S.Vijay Raajaa <gsvijayraa...@gmail.com>:
>
>> My bad. I meant only join. I am currently using keyBy on a timestamp
>> common across the streams.
>>
>> Regards,
>> Vijay Raajaa GS
>>
>> On Mon, Jul 31, 2017 at 1:16 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> @Wei: You can implement very different behavior using a
>>> CoProcessFunction. However, if your operator is time-based, the logical
>>> time of the operator will be the minimum time of both streams (time of the
>>> "slower" watermark).
>>>
>>> @Vijay: I did not understand what your requirements are. Do you want to
>>> join or merge streams? Those are two different things. This thread
>>> discusses joins not merging.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2017-07-31 4:24 GMT+02:00 G.S.Vijay Raajaa <gsvijayraa...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> How do I order by the merge time. Let's say I merge the stream at T1. I
>>>> wanted to drop T2 merge if T2 < T1. Now depending on the arrival of data
>>>> from individual stream and the time at which the merge happens, they become
>>>> out of order. Any thoughts will be really appreciated.
>>>>
>>>> Regards,
>>>> Vijay Raajaa GS
>>>>
>>>> On Jul 31, 2017 1:14 AM, "wei" <jixia...@googlemail.com> wrote:
>>>>
>>>> Hello Fabian,
>>>>
>>>>
>>>>
>>>> thank you for your answer!
>>>>
>>>>
>>>>
>>>> Does it mean that the operator will wait until get two watermarks from
>>>> the input streams and emits then the “slower” watermark?
>>>>
>>>>
>>>>
>>>> Best regards
>>>>
>>>> Wei
>>>>
>>>>
>>>>
>>>> *Von:* Fabian Hueske [mailto:fhue...@gmail.com]
>>>> *Gesendet:* Sunday, July 30, 2017 11:17 AM
>>>> *An:* xie wei
>>>> *Cc:* user
>>>> *Betreff:* Re: Is watermark used by joining two streams
>>>>
>>>>
>>>>
>>>> Periodic and punctuated watermarks only differ in the way that they are
>>>> generated. Afterwards they are treated the same.
>>>>
>>>> An operator with two input streams will always sync its own watermarks
>>>> to the watermarks of both input streams, i.e., to the "slower" watermark of
>>>> both inputs.
>>>>
>>>> So if the left input says it is 12:14 and the right says it is 11:53,
>>>> the operator will have a internal time of 11:53 and emit watermarks
>>>> according to that time.
>>>>
>>>> Hope that helps,
>>>>
>>>> Fabian
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2017-07-28 15:00 GMT+02:00 xie wei <jixia...@googlemail.com>:
>>>>
>>>> Hello,
>>>>
>>>> i want to join two streams based on event time window, every stream has
>>>> its own watermark, one has priodic watermark and the other has punctuated
>>>> watermark.
>>>>
>>>> are the watermarks used to trigger the join? if yes, which one and how
>>>> is it used?
>>>>
>>>> Thank you and best regards
>>>>
>>>> Wei
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to