Thanks for the help. I was able to make more progress (based on the documentation you provided), but now I'm getting this exception:
org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) Here's my code now: DataStream<String> combinedEnvelopes = dataStream .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2 map(String incomingMessage) throws Exception { return mapToTuple(incomingMessage); } }) .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(20))) .aggregate(new JsonConcatenator()) .returns(String.class); Here's the JsonConcatenator that I'm referencing above: private static class JsonConcatenator implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> { @Override public Tuple2<String, String> createAccumulator() { return new Tuple2<String, String>("",""); } @Override public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) { return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); } @Override public String getResult(Tuple2<String, String> accumulator) { return "[" + accumulator.f1 + "]"; } @Override public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) { return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); } } vino yang wrote > Hi devinbost, > > Sharing two example links with you : > > > - the example code of official documentation[1]; > - a StackOverflow answer of a similar question[2]; > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction > [2]: > https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate > > I hope these resources are helpful to you. > > Best, > Vino -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/