Hi, My stream data is in a type of Tuple2<Long, String> that contains the
timestamp (in second) and data, respectively. The source will generate 120
sample every second. Using the following code I want to get data in every
second and then apply the reduce function on them.

temp.keyBy( 0).timeWindow(Time.seconds(1))
        .reduce(new ReduceFunction<Tuple2<Long, String>>() {
    @Override
    public Tuple2<Long, String> reduce(Tuple2<Long, String>
longStringTuple2, Tuple2<Long, String> t1) throws Exception {
        System.out.println("reduced");
        return new Tuple2<>(longStringTuple2.f0, longStringTuple2.f1 +
"," + t1.f1) ;
    }
}).print() ;

I expected it print reduced data for every second, according to the reduce
function, but it just print the test line

System.out.println("reduced");

that I put in reduce function to see if it enter the reduce function or
not. I can confirm the data are entering in temp variable.

What is the problem ? Should I implement a trigger function

Reply via email to