Did you check the new interval join that was added with Flink 1.6.0 [1]? It might be better suited because, each record has its own boundaries based on its timestamp and the join window interval.
Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman < eric.good...@colorado.edu>: > If I change it to a Tumbling window some of the results will be lost since > the pattern I'm matching has a temporal extent, so if the pattern starts in > one tumbling window and ends in the next, it won't be reported. Based on > the temporal length of the query, you can set the sliding window and the > window lengths to capture all the patterns, though as you note, you will > get duplicates. > > On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Eric, >> >> Can you change Sliding window to Tumbling window? The data of different >> sliding window are likely overlap. >> >> Best, Hequn >> >> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <wos...@gmail.com> wrote: >> >>> Hey, >>> IMHO, the simplest way in your case would be to use the Evictor to evict >>> duplicate values after the window is generated. Have look at it here: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html >>> >>> Best Regards, >>> Dominik. >>> >>> pon., 8 paź 2018 o 08:00 Eric L Goodman <eric.good...@colorado.edu> >>> napisał(a): >>> >>>> What is the best way to avoid or remove duplicates when joining a >>>> stream with itself? I'm performing a streaming temporal triangle >>>> computation and the first part is to find triads of two edges of the form >>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints >>>> where the first edge occurs before the second edge). To do that, I have >>>> the following code: >>>> >>>> DataStream<Triad> triads = edges.join(edges) >>>> .where(new DestKeySelector()) >>>> .equalTo(new SourceKeySelector()) >>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs), >>>> Time.milliseconds(slideSizeMs))) >>>> .apply(new EdgeJoiner(queryWindow)); >>>> >>>> However, when I look at the triads being built, there are two copies of >>>> each triad. >>>> >>>> For example, if I create ten edges (time, source, target): >>>> >>>> 0.0, 4, 0 >>>> >>>> 0.01, 1, 5 >>>> >>>> 0.02, 3, 7 >>>> >>>> 0.03, 0, 8 >>>> >>>> 0.04, 0, 9 >>>> >>>> 0.05, 4, 8 >>>> >>>> 0.06, 4, 3 >>>> >>>> 0.07, 5, 9 >>>> >>>> 0.08, 7, 1 >>>> >>>> 0.09, 9, 6 >>>> >>>> >>>> It creates the following triads (time1, source1, target1, time2, >>>> source2, targe2). Note there are two copies of each. >>>> >>>> 0.0, 4, 0 0.03, 0, 8 >>>> >>>> 0.0, 4, 0 0.03, 0, 8 >>>> >>>> 0.0, 4, 0 0.04, 0, 9 >>>> >>>> 0.0, 4, 0 0.04, 0, 9 >>>> >>>> 0.01, 1, 5 0.07, 5, 9 >>>> >>>> 0.01, 1, 5 0.07, 5, 9 >>>> >>>> 0.02, 3, 7 0.08, 7, 1 >>>> >>>> 0.02, 3, 7 0.08, 7, 1 >>>> >>>> 0.04, 0, 9 0.09, 9, 6 >>>> >>>> 0.04, 0, 9 0.09, 9, 6 >>>> >>>> 0.07, 5, 9 0.09, 9, 6 >>>> >>>> 0.07, 5, 9 0.09, 9, 6 >>>> >>>> I'm assuming this behavior has something to do with the joining of "edges" >>>> with itself. >>>> >>>> I can provide more code if that would be helpful, but I believe I've >>>> captured the most salient portion. >>>> >>>> >>>> >>>> >>>> >>>>