I think there might be a bug in `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))` (unless I'm just not using it correctly) because I'm able to get output when I use the simpler window `.timeWindow(Time.seconds(5))` However, I don't get any output when I used the session-based window.
devinbost wrote > I added logging statements everywhere in my code, and I'm able to see my > message reach the `add` method in the AggregateFunction that I > implemented, > but the getResult method is never called. > > In the code below, I also never see the: > "Ran dataStream. Adding sink next" > line appear in my log, and the only log statements from the > JsonConcatenator > class come from the `add` method, as shown below. > > > DataStream > <String> > combinedEnvelopes = dataStream > .map(new MapFunction<String, Tuple2&lt;String, String>>() { > @Override > public Tuple2 map(String incomingMessage) throws Exception { > return mapToTuple(incomingMessage); > } > }) > .keyBy(0) > .window(EventTimeSessionWindows.withGap(Time.seconds(20))) > .aggregate(new JsonConcatenator()); > > Logger logger = LoggerFactory.getLogger(StreamJob.class); > logger.info("Ran dataStream. Adding sink next") > > ------------- > > private static class JsonConcatenator > implements AggregateFunction<Tuple2&lt;String, String>, > Tuple2<String, String>, String> { > Logger logger = LoggerFactory.getLogger(SplinklerJob.class); > @Override > public Tuple2<String, String> createAccumulator() { > return new Tuple2<String, String>("",""); > } > > @Override > public Tuple2<String, String> add(Tuple2<String, String> > value, > Tuple2<String, String> accumulator) { > logger.info("Running Add on value.f0: " + value.f0 + " and > value.f1: > " + value.f1); > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); > } > > @Override > public String getResult(Tuple2<String, String> accumulator) { > logger.info("Running getResult on accumulator.f1: " + > accumulator.f1); > return "[" + accumulator.f1 + "]"; > } > > @Override > public Tuple2<String, String> merge(Tuple2<String, String> > a, > Tuple2<String, String> b) { > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + > a.f1 > + " and b.f1: " + b.f1); > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); > } > } > > > > > Any ideas? > > > Chris Miller-2 wrote >> I hit the same problem, as far as I can tell it should be fixed in >> Pulsar 2.4.2. The release of this has already passed voting so I hope it >> should be available in a day or two. >> >> https://github.com/apache/pulsar/pull/5068 > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/