Hi all, I'm experiencing some unexpected behavior using an interval join in Flink. I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.
The code to join looks something like this: xStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey) .intervalJoin( yStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey)) .between(Time.minutes(-60), Time.minutes(30)) .process(new ProcessJoinFunction[X, Y, String] { override def processElement( left: X, right: Y, ctx: ProcessJoinFunction[X, Y, String]#Context, out: Collector[String]): Unit = { out.collect(left + ":" + right) } However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data? Thx a lot, Wouter