Hi Eric, Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.
Best, Hequn On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <wos...@gmail.com> wrote: > Hey, > IMHO, the simplest way in your case would be to use the Evictor to evict > duplicate values after the window is generated. Have look at it here: > https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html > > Best Regards, > Dominik. > > pon., 8 paź 2018 o 08:00 Eric L Goodman <eric.good...@colorado.edu> > napisał(a): > >> What is the best way to avoid or remove duplicates when joining a stream >> with itself? I'm performing a streaming temporal triangle computation and >> the first part is to find triads of two edges of the form vertexA->vertexB >> and vertexB->vertexC (and there are temporal constraints where the first >> edge occurs before the second edge). To do that, I have the following code: >> >> DataStream<Triad> triads = edges.join(edges) >> .where(new DestKeySelector()) >> .equalTo(new SourceKeySelector()) >> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs), >> Time.milliseconds(slideSizeMs))) >> .apply(new EdgeJoiner(queryWindow)); >> >> However, when I look at the triads being built, there are two copies of each >> triad. >> >> For example, if I create ten edges (time, source, target): >> >> 0.0, 4, 0 >> >> 0.01, 1, 5 >> >> 0.02, 3, 7 >> >> 0.03, 0, 8 >> >> 0.04, 0, 9 >> >> 0.05, 4, 8 >> >> 0.06, 4, 3 >> >> 0.07, 5, 9 >> >> 0.08, 7, 1 >> >> 0.09, 9, 6 >> >> >> It creates the following triads (time1, source1, target1, time2, source2, >> targe2). Note there are two copies of each. >> >> 0.0, 4, 0 0.03, 0, 8 >> >> 0.0, 4, 0 0.03, 0, 8 >> >> 0.0, 4, 0 0.04, 0, 9 >> >> 0.0, 4, 0 0.04, 0, 9 >> >> 0.01, 1, 5 0.07, 5, 9 >> >> 0.01, 1, 5 0.07, 5, 9 >> >> 0.02, 3, 7 0.08, 7, 1 >> >> 0.02, 3, 7 0.08, 7, 1 >> >> 0.04, 0, 9 0.09, 9, 6 >> >> 0.04, 0, 9 0.09, 9, 6 >> >> 0.07, 5, 9 0.09, 9, 6 >> >> 0.07, 5, 9 0.09, 9, 6 >> >> I'm assuming this behavior has something to do with the joining of "edges" >> with itself. >> >> I can provide more code if that would be helpful, but I believe I've >> captured the most salient portion. >> >> >> >> >> >>