Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite
(10k elements) but I interpret them as a DataStream. The data needs to be
joined for enrichment purposes. I use event time and I know (because I
generated the data myself) that the timestamp of an element Y is always
between -60 minutes and +30 minutes of the element with the same key in set
X. Both datasets are in-order (in terms of timestamps), equal in size,
share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:

xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }


However, about 30% percent of the data is not joined. Is there a proper way
to debug this? For instance, in windows you can side-output late data. Is
there a possibility to side-output unjoinable data?

Thx a lot,
Wouter

Reply via email to