Hi Wouter, Not sure what is going wrong there, but something that you could try is to use a custom watemark assigner and always return a watermark of 0. When the source finished serving the watermarks, it emits a final Long.MAX_VALUE watermark. Hence the join should consume all events and store them in state. When both sources are finished, it would start to join the data and clean up the state. This test would show if there are any issue with late data.
Best, Fabian Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Anyone some leads on this issue? Have been looking into the > IntervalJoinOperator code, but that didn't really help. My intuition is > that it is rejected because of lateness, however that still confuses me > since I'm sure that both datastreams have monotonic increasing timestamps. > > Thx, Wouter > > Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Hi all, >> >> I'm experiencing some unexpected behavior using an interval join in Flink. >> I'm dealing with two data sets, lets call them X and Y. They are finite >> (10k elements) but I interpret them as a DataStream. The data needs to be >> joined for enrichment purposes. I use event time and I know (because I >> generated the data myself) that the timestamp of an element Y is always >> between -60 minutes and +30 minutes of the element with the same key in set >> X. Both datasets are in-order (in terms of timestamps), equal in size, >> share a common key and parallelism is set to 1 throughout the whole program. >> >> The code to join looks something like this: >> >> xStream >> .assignAscendingTimestamps(_.date.getTime) >> .keyBy(_.commonKey) >> .intervalJoin( >> yStream >> .assignAscendingTimestamps(_.date.getTime) >> .keyBy(_.commonKey)) >> .between(Time.minutes(-60), Time.minutes(30)) >> .process(new ProcessJoinFunction[X, Y, String] { >> override def processElement( >> left: X, >> right: Y, >> ctx: ProcessJoinFunction[X, Y, String]#Context, >> out: Collector[String]): Unit = { >> >> out.collect(left + ":" + right) >> } >> >> >> However, about 30% percent of the data is not joined. Is there a proper >> way to debug this? For instance, in windows you can side-output late data. >> Is there a possibility to side-output unjoinable data? >> >> Thx a lot, >> Wouter >> >> >>