Anyone some leads on this issue? Have been looking into the
IntervalJoinOperator code, but that didn't really help. My intuition is
that it is rejected because of lateness, however that still confuses me
since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hi all,
>
> I'm experiencing some unexpected behavior using an interval join in Flink.
> I'm dealing with two data sets, lets call them X and Y. They are finite
> (10k elements) but I interpret them as a DataStream. The data needs to be
> joined for enrichment purposes. I use event time and I know (because I
> generated the data myself) that the timestamp of an element Y is always
> between -60 minutes and +30 minutes of the element with the same key in set
> X. Both datasets are in-order (in terms of timestamps), equal in size,
> share a common key and parallelism is set to 1 throughout the whole program.
>
> The code to join looks something like this:
>
> xStream
>       .assignAscendingTimestamps(_.date.getTime)
>       .keyBy(_.commonKey)
>       .intervalJoin(
>         yStream
>           .assignAscendingTimestamps(_.date.getTime)
>           .keyBy(_.commonKey))
>       .between(Time.minutes(-60), Time.minutes(30))
>       .process(new ProcessJoinFunction[X, Y, String] {
>         override def processElement(
>             left: X,
>             right: Y,
>             ctx: ProcessJoinFunction[X, Y, String]#Context,
>             out: Collector[String]): Unit = {
>
>           out.collect(left + ":" + right)
>         }
>
>
> However, about 30% percent of the data is not joined. Is there a proper
> way to debug this? For instance, in windows you can side-output late data.
> Is there a possibility to side-output unjoinable data?
>
> Thx a lot,
> Wouter
>
>
>

Reply via email to