Thanks for the help.
I was able to make more progress (based on the documentation you provided),
but now I'm getting this exception:

org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
serializable. The object probably contains or references non serializable
fields.
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)

Here's my code now:

DataStream<String> combinedEnvelopes = dataStream
    .map(new MapFunction<String, Tuple2&lt;String, String>>() {
        @Override
        public Tuple2 map(String incomingMessage) throws Exception {
            return mapToTuple(incomingMessage);
        }
    })
    .keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
    .aggregate(new JsonConcatenator())
    .returns(String.class);

Here's the JsonConcatenator that I'm referencing above:

private static class JsonConcatenator 
    implements AggregateFunction<Tuple2&lt;String, String>, Tuple2<String,
String>, String> {
    @Override
    public Tuple2<String, String> createAccumulator() {
        return new Tuple2<String, String>("","");
    }

    @Override
    public Tuple2<String, String> add(Tuple2<String, String> value,
Tuple2<String, String> accumulator) {
        return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
    }

    @Override
    public String getResult(Tuple2<String, String> accumulator) {
        return "[" + accumulator.f1 + "]";
    }

    @Override
    public Tuple2<String, String> merge(Tuple2<String, String> a,
Tuple2<String, String> b) {
        return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
    }
}



vino yang wrote
> Hi devinbost,
> 
> Sharing two example links with you :
> 
> 
>    - the example code of official documentation[1];
>    - a StackOverflow answer of a similar question[2];
> 
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
> [2]:
> https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate
> 
> I hope these resources are helpful to you.
> 
> Best,
> Vino





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to