What is the best way to avoid or remove duplicates when joining a stream with itself? I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge). To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges) .where(new DestKeySelector()) .equalTo(new SourceKeySelector()) .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs), Time.milliseconds(slideSizeMs))) .apply(new EdgeJoiner(queryWindow)); However, when I look at the triads being built, there are two copies of each triad. For example, if I create ten edges (time, source, target): 0.0, 4, 0 0.01, 1, 5 0.02, 3, 7 0.03, 0, 8 0.04, 0, 9 0.05, 4, 8 0.06, 4, 3 0.07, 5, 9 0.08, 7, 1 0.09, 9, 6 It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each. 0.0, 4, 0 0.03, 0, 8 0.0, 4, 0 0.03, 0, 8 0.0, 4, 0 0.04, 0, 9 0.0, 4, 0 0.04, 0, 9 0.01, 1, 5 0.07, 5, 9 0.01, 1, 5 0.07, 5, 9 0.02, 3, 7 0.08, 7, 1 0.02, 3, 7 0.08, 7, 1 0.04, 0, 9 0.09, 9, 6 0.04, 0, 9 0.09, 9, 6 0.07, 5, 9 0.09, 9, 6 0.07, 5, 9 0.09, 9, 6 I'm assuming this behavior has something to do with the joining of "edges" with itself. I can provide more code if that would be helpful, but I believe I've captured the most salient portion.