Hi,

In my use case, I am attempting to create a keyedStream (on a string) and
then window that stream (which represents keyed JSON objects) with
EventTimeSessionWindows (so that I have a separate window for each set of
JSON messages, according to the key), and then concatenate the JSON objects
by their keys. (e.g. If message1, message2, and message3 all have the same
key, they should be concatenated to a JSON array like: [message1,message2,
message3].)

I think my code expresses my intent conceptually, but I learned that Fold is
deprecated because it can't perform partial aggregations. Instead, I need to
use the AggregateFunction, but I'm having trouble understanding the API
documentation. How do I convert this code to an implementation that uses the
AggregateFunction instead?

DataStream<String> combinedEnvelopes = dataStream
    .map(new MapFunction<String, Tuple2&lt;String, JSONObject>>() {
        @Override
        public Tuple2 map(String incomingMessage) throws Exception {
            return mapToTuple(incomingMessage);
        }
    })
    .keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
    .fold("[", new FoldFunction<Tuple2&lt;String, JSONObject>, String>() {
        @Override
        public String fold(String concatenatedJsonArray, Tuple2
incomingMessage) {
            return concatenatedJsonArray + ", " +
incomingMessage.f1.toString();
        }
    })
    .map(new MapFunction<String, String>() {
        @Override
        public String map(String jsonPartialArray) throws Exception {
            return jsonPartialArray + "]";
        }
    })
    .returns(String.class);




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to