Hi Fabian,

Thanks for your reply. I managed to resolve this issue. Actually this
behavior was not so unexpected, I messed up using xStream as a 'base' while
I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <=
xStream.element <= yStream.element + 30 min. Interchanging both datastreams
fixed this issue.

Thanks anyways.

Cheers, Wouter



Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske <fhue...@gmail.com>:

> Hi Wouter,
>
> Not sure what is going wrong there, but something that you could try is to
> use a custom watemark assigner and always return a watermark of 0.
> When the source finished serving the watermarks, it emits a final
> Long.MAX_VALUE watermark.
> Hence the join should consume all events and store them in state. When
> both sources are finished, it would start to join the data and clean up the
> state.
> This test would show if there are any issue with late data.
>
> Best, Fabian
>
> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
>
>> Anyone some leads on this issue? Have been looking into the
>> IntervalJoinOperator code, but that didn't really help. My intuition is
>> that it is rejected because of lateness, however that still confuses me
>> since I'm sure that both datastreams have monotonic increasing timestamps.
>>
>> Thx, Wouter
>>
>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>>
>>> Hi all,
>>>
>>> I'm experiencing some unexpected behavior using an interval join in
>>> Flink.
>>> I'm dealing with two data sets, lets call them X and Y. They are finite
>>> (10k elements) but I interpret them as a DataStream. The data needs to be
>>> joined for enrichment purposes. I use event time and I know (because I
>>> generated the data myself) that the timestamp of an element Y is always
>>> between -60 minutes and +30 minutes of the element with the same key in set
>>> X. Both datasets are in-order (in terms of timestamps), equal in size,
>>> share a common key and parallelism is set to 1 throughout the whole program.
>>>
>>> The code to join looks something like this:
>>>
>>> xStream
>>>       .assignAscendingTimestamps(_.date.getTime)
>>>       .keyBy(_.commonKey)
>>>       .intervalJoin(
>>>         yStream
>>>           .assignAscendingTimestamps(_.date.getTime)
>>>           .keyBy(_.commonKey))
>>>       .between(Time.minutes(-60), Time.minutes(30))
>>>       .process(new ProcessJoinFunction[X, Y, String] {
>>>         override def processElement(
>>>             left: X,
>>>             right: Y,
>>>             ctx: ProcessJoinFunction[X, Y, String]#Context,
>>>             out: Collector[String]): Unit = {
>>>
>>>           out.collect(left + ":" + right)
>>>         }
>>>
>>>
>>> However, about 30% percent of the data is not joined. Is there a proper
>>> way to debug this? For instance, in windows you can side-output late data.
>>> Is there a possibility to side-output unjoinable data?
>>>
>>> Thx a lot,
>>> Wouter
>>>
>>>
>>>

Reply via email to