Hi all,

I have a word count using flink stream and mey reduce transformations is
applying a WindowFunction. I would like that this WindowFunction sort the
output of the reduce. Is that possible? So I will sort by key the data set
inside the window.

Thanks for your ideas!

Here is my code:

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9000)
                .map(new UpperCaserMap())
                .flatMap(new Splitter())
                .keyBy(new SumWordSelect()) // select the first value as a
key using the KeySelector class
                .timeWindow(Time.seconds(5)) // use this if Apache Flink
server is up
                .reduce(new SumWordsReduce(), new FIlterWindowFunction())
                ;

    public static class ReduceWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            Integer sum = 0;
            for (Tuple2<String, Integer> input : inputs) {
                sum = sum + input.f1;
            }
            out.collect(new Tuple2<>(key, sum));
        }
    }

    public static class FIlterWindowFunction implements WindowFunction<
            Tuple2<String, Integer>, // input type
            Tuple2<String, Integer>, // output type
            String, // key type
            TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow window,
                          Iterable<Tuple2<String, Integer>> inputs,
                          Collector<Tuple2<String, Integer>> out) {
            // Integer value = 0;
            for (Tuple2<String, Integer> input : inputs) {
                // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
                out.collect(new Tuple2<>(key, input.f1));
            }
        }
    }



-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to