I did confirm that I got no resulting output after 20 seconds and after sending additional data after waiting over a minute between batches of data.
My code looks like this: PulsarSourceBuilder<String> builder = PulsarSourceBuilder .builder(new SimpleStringSchema()) .serviceUrl(SERVICE_URL) .topic(INPUT_TOPIC) .subscriptionName(SUBSCRIPTION_NAME); SourceFunction<String> src = builder.build(); DataStream<String> dataStream = env.addSource(src); DataStream<String> combinedEnvelopes = dataStream .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2 map(String incomingMessage) throws Exception { return mapToTuple(incomingMessage); } }) .keyBy(0) //.timeWindow(Time.seconds(5)) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .aggregate(new JsonConcatenator()); //dataStream.print(); Logger logger = LoggerFactory.getLogger(SplinklerJob.class); logger.info("Ran dataStream. Adding sink next"); combinedEnvelopes.addSink(new FlinkPulsarProducer<>( SERVICE_URL, OUTPUT_TOPIC, new AuthenticationDisabled(), // probably need to fix // AuthenticationTls() combinedData -> combinedData.toString().getBytes(UTF_8), combinedData -> "test") ); logger.info("Added sink. Executing job."); // execute program env.execute("Flink Streaming Java API Skeleton"); Here is the JsonConcatenator class: private static class JsonConcatenator implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> { Logger logger = LoggerFactory.getLogger(SplinklerJob.class); @Override public Tuple2<String, String> createAccumulator() { return new Tuple2<String, String>("",""); } @Override public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) { logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " + value.f1); return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); } @Override public String getResult(Tuple2<String, String> accumulator) { logger.info("Running getResult on accumulator.f1: " + accumulator.f1); return "[" + accumulator.f1.substring(1) + "]"; } @Override public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) { // Merge is applied when you allow lateness. logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + " and b.f1: " + b.f1); if(b.f1.charAt(0) == '['){ logger.info("During merge, we detected the right message starts with the '[' character. Removing it."); b.f1 = b.f1.substring(1); } return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); } } Devin G. Bost Re: getResult will only be called when the window is triggered. For a > fixed-time window, it triggers at the end of the window. > However, for EventTimeSessionWindows you need to have gaps in the data. > Can you verify that there is actually a 20sec pause inbetween data points > for your keys? > Additionally, it may also be an issue with extracting the event time from > the sources. Could you post the relevant code as well? > Best, > Arvid On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise <ar...@ververica.com> wrote: > getResult will only be called when the window is triggered. For a > fixed-time window, it triggers at the end of the window. > > However, for EventTimeSessionWindows you need to have gaps in the data. > Can you verify that there is actually a 20sec pause inbetween data points > for your keys? > Additionally, it may also be an issue with extracting the event time from > the sources. Could you post the relevant code as well? > > Best, > > Arvid > > On Mon, Dec 9, 2019 at 8:51 AM vino yang <yanghua1...@gmail.com> wrote: > >> Hi dev, >> >> The time of the window may have different semantics. >> In the session window, it's only a time gap, the size of the window is >> driven via activity events. >> In the tumbling or sliding window, it means the size of the window. >> >> For more details, please see the official documentation.[1] >> >> Best, >> Vino >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows >> >> >> >> devinbost <devin.b...@gmail.com> 于2019年12月6日周五 下午10:39写道: >> >>> I think there might be a bug in >>> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))` >>> (unless I'm just not using it correctly) because I'm able to get output >>> when I use the simpler window >>> `.timeWindow(Time.seconds(5))` >>> However, I don't get any output when I used the session-based window. >>> >>> >>> devinbost wrote >>> > I added logging statements everywhere in my code, and I'm able to see >>> my >>> > message reach the `add` method in the AggregateFunction that I >>> > implemented, >>> > but the getResult method is never called. >>> > >>> > In the code below, I also never see the: >>> > "Ran dataStream. Adding sink next" >>> > line appear in my log, and the only log statements from the >>> > JsonConcatenator >>> > class come from the `add` method, as shown below. >>> > >>> > >>> > DataStream >>> > <String> >>> > combinedEnvelopes = dataStream >>> > .map(new MapFunction<String, Tuple2&lt;String, >>> String>>() { >>> > @Override >>> > public Tuple2 map(String incomingMessage) throws Exception { >>> > return mapToTuple(incomingMessage); >>> > } >>> > }) >>> > .keyBy(0) >>> > .window(EventTimeSessionWindows.withGap(Time.seconds(20))) >>> > .aggregate(new JsonConcatenator()); >>> > >>> > Logger logger = LoggerFactory.getLogger(StreamJob.class); >>> > logger.info("Ran dataStream. Adding sink next") >>> > >>> > ------------- >>> > >>> > private static class JsonConcatenator >>> > implements AggregateFunction<Tuple2&lt;String, >>> String>, >>> > Tuple2<String, String>, String> { >>> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class); >>> > @Override >>> > public Tuple2<String, String> createAccumulator() { >>> > return new Tuple2<String, String>("",""); >>> > } >>> > >>> > @Override >>> > public Tuple2<String, String> add(Tuple2<String, >>> String> >>> > value, >>> > Tuple2<String, String> accumulator) { >>> > logger.info("Running Add on value.f0: " + value.f0 + " and >>> > value.f1: >>> > " + value.f1); >>> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + >>> value.f1); >>> > } >>> > >>> > @Override >>> > public String getResult(Tuple2<String, String> accumulator) { >>> > logger.info("Running getResult on accumulator.f1: " + >>> > accumulator.f1); >>> > return "[" + accumulator.f1 + "]"; >>> > } >>> > >>> > @Override >>> > public Tuple2<String, String> merge(Tuple2<String, >>> String> >>> > a, >>> > Tuple2<String, String> b) { >>> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " >>> + >>> > a.f1 >>> > + " and b.f1: " + b.f1); >>> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); >>> > } >>> > } >>> > >>> > >>> > >>> > >>> > Any ideas? >>> > >>> > >>> > Chris Miller-2 wrote >>> >> I hit the same problem, as far as I can tell it should be fixed in >>> >> Pulsar 2.4.2. The release of this has already passed voting so I hope >>> it >>> >> should be available in a day or two. >>> >> >>> >> https://github.com/apache/pulsar/pull/5068 >>> > >>> > >>> > >>> > >>> > >>> > -- >>> > Sent from: >>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>