Hi devinbost,

Sharing two example links with you :


   - the example code of official documentation[1];
   - a StackOverflow answer of a similar question[2];

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#aggregatefunction
[2]:
https://stackoverflow.com/questions/47123785/flink-how-to-convert-the-deprecated-fold-to-aggregrate

I hope these resources are helpful to you.

Best,
Vino

devinbost <devin.b...@gmail.com> 于2019年12月5日周四 上午9:38写道:

> Hi,
>
> In my use case, I am attempting to create a keyedStream (on a string) and
> then window that stream (which represents keyed JSON objects) with
> EventTimeSessionWindows (so that I have a separate window for each set of
> JSON messages, according to the key), and then concatenate the JSON objects
> by their keys. (e.g. If message1, message2, and message3 all have the same
> key, they should be concatenated to a JSON array like: [message1,message2,
> message3].)
>
> I think my code expresses my intent conceptually, but I learned that Fold
> is
> deprecated because it can't perform partial aggregations. Instead, I need
> to
> use the AggregateFunction, but I'm having trouble understanding the API
> documentation. How do I convert this code to an implementation that uses
> the
> AggregateFunction instead?
>
> DataStream<String> combinedEnvelopes = dataStream
>     .map(new MapFunction<String, Tuple2&lt;String, JSONObject>>() {
>         @Override
>         public Tuple2 map(String incomingMessage) throws Exception {
>             return mapToTuple(incomingMessage);
>         }
>     })
>     .keyBy(0)
>     .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>     .fold("[", new FoldFunction<Tuple2&lt;String, JSONObject>, String>() {
>         @Override
>         public String fold(String concatenatedJsonArray, Tuple2
> incomingMessage) {
>             return concatenatedJsonArray + ", " +
> incomingMessage.f1.toString();
>         }
>     })
>     .map(new MapFunction<String, String>() {
>         @Override
>         public String map(String jsonPartialArray) throws Exception {
>             return jsonPartialArray + "]";
>         }
>     })
>     .returns(String.class);
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to