Hi Fabian, Thanks for your reply. I managed to resolve this issue. Actually this behavior was not so unexpected, I messed up using xStream as a 'base' while I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= xStream.element <= yStream.element + 30 min. Interchanging both datastreams fixed this issue.
Thanks anyways. Cheers, Wouter Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske <fhue...@gmail.com>: > Hi Wouter, > > Not sure what is going wrong there, but something that you could try is to > use a custom watemark assigner and always return a watermark of 0. > When the source finished serving the watermarks, it emits a final > Long.MAX_VALUE watermark. > Hence the join should consume all events and store them in state. When > both sources are finished, it would start to join the data and clean up the > state. > This test would show if there are any issue with late data. > > Best, Fabian > > Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Anyone some leads on this issue? Have been looking into the >> IntervalJoinOperator code, but that didn't really help. My intuition is >> that it is rejected because of lateness, however that still confuses me >> since I'm sure that both datastreams have monotonic increasing timestamps. >> >> Thx, Wouter >> >> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> >>> Hi all, >>> >>> I'm experiencing some unexpected behavior using an interval join in >>> Flink. >>> I'm dealing with two data sets, lets call them X and Y. They are finite >>> (10k elements) but I interpret them as a DataStream. The data needs to be >>> joined for enrichment purposes. I use event time and I know (because I >>> generated the data myself) that the timestamp of an element Y is always >>> between -60 minutes and +30 minutes of the element with the same key in set >>> X. Both datasets are in-order (in terms of timestamps), equal in size, >>> share a common key and parallelism is set to 1 throughout the whole program. >>> >>> The code to join looks something like this: >>> >>> xStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey) >>> .intervalJoin( >>> yStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey)) >>> .between(Time.minutes(-60), Time.minutes(30)) >>> .process(new ProcessJoinFunction[X, Y, String] { >>> override def processElement( >>> left: X, >>> right: Y, >>> ctx: ProcessJoinFunction[X, Y, String]#Context, >>> out: Collector[String]): Unit = { >>> >>> out.collect(left + ":" + right) >>> } >>> >>> >>> However, about 30% percent of the data is not joined. Is there a proper >>> way to debug this? For instance, in windows you can side-output late data. >>> Is there a possibility to side-output unjoinable data? >>> >>> Thx a lot, >>> Wouter >>> >>> >>>