Hi, In my use case, I am attempting to create a keyedStream (on a string) and then window that stream (which represents keyed JSON objects) with EventTimeSessionWindows (so that I have a separate window for each set of JSON messages, according to the key), and then concatenate the JSON objects by their keys. (e.g. If message1, message2, and message3 all have the same key, they should be concatenated to a JSON array like: [message1,message2, message3].)
I think my code expresses my intent conceptually, but I learned that Fold is deprecated because it can't perform partial aggregations. Instead, I need to use the AggregateFunction, but I'm having trouble understanding the API documentation. How do I convert this code to an implementation that uses the AggregateFunction instead? DataStream<String> combinedEnvelopes = dataStream .map(new MapFunction<String, Tuple2<String, JSONObject>>() { @Override public Tuple2 map(String incomingMessage) throws Exception { return mapToTuple(incomingMessage); } }) .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(20))) .fold("[", new FoldFunction<Tuple2<String, JSONObject>, String>() { @Override public String fold(String concatenatedJsonArray, Tuple2 incomingMessage) { return concatenatedJsonArray + ", " + incomingMessage.f1.toString(); } }) .map(new MapFunction<String, String>() { @Override public String map(String jsonPartialArray) throws Exception { return jsonPartialArray + "]"; } }) .returns(String.class); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/