Hi devinbost, Sharing two example links with you :
- the example code of official documentation[1]; - a StackOverflow answer of a similar question[2]; [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction [2]: https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate I hope these resources are helpful to you. Best, Vino devinbost <devin.b...@gmail.com> 于2019年12月5日周四 上午9:38写道: > Hi, > > In my use case, I am attempting to create a keyedStream (on a string) and > then window that stream (which represents keyed JSON objects) with > EventTimeSessionWindows (so that I have a separate window for each set of > JSON messages, according to the key), and then concatenate the JSON objects > by their keys. (e.g. If message1, message2, and message3 all have the same > key, they should be concatenated to a JSON array like: [message1,message2, > message3].) > > I think my code expresses my intent conceptually, but I learned that Fold > is > deprecated because it can't perform partial aggregations. Instead, I need > to > use the AggregateFunction, but I'm having trouble understanding the API > documentation. How do I convert this code to an implementation that uses > the > AggregateFunction instead? > > DataStream<String> combinedEnvelopes = dataStream > .map(new MapFunction<String, Tuple2<String, JSONObject>>() { > @Override > public Tuple2 map(String incomingMessage) throws Exception { > return mapToTuple(incomingMessage); > } > }) > .keyBy(0) > .window(EventTimeSessionWindows.withGap(Time.seconds(20))) > .fold("[", new FoldFunction<Tuple2<String, JSONObject>, String>() { > @Override > public String fold(String concatenatedJsonArray, Tuple2 > incomingMessage) { > return concatenatedJsonArray + ", " + > incomingMessage.f1.toString(); > } > }) > .map(new MapFunction<String, String>() { > @Override > public String map(String jsonPartialArray) throws Exception { > return jsonPartialArray + "]"; > } > }) > .returns(String.class); > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >