Hi all,

What would be a natural way to implement a parallel version of the
following Flink program?

Suppose I have a stream of items of type DataItem with two concrete
implementations of DataItem: Value and Barrier. Let’s say that Value holds
an integer value, and Barrier acts as explicit punctuation.

public interface DataItem {}

public class Value implements DataItem {

   private final int val;

   public Value(int val) { this.val = val; }

   public int getVal() { return val; }

}

public class Barrier implements DataItem {}

The program should maintain a sum of values seen since the beginning of the
stream. On each Barrier, the program should output the sum seen so far.

An obvious way to implement this would be with a FlatMapFunction,
maintaining the sum as state and emitting it on each Barrier.

StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

DataStream<DataItem> stream = env.fromElements(DataItem.class,

       new Value(1), new Barrier(), new Value(3), new Value(-1), new
Barrier());

stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

   private int sum = 0;

   @Override

   public void flatMap(DataItem dataItem, Collector<Integer> collector) throws
Exception {

       if (dataItem instanceof Value) {

           sum += ((Value) dataItem).getVal();

       } else {

           collector.collect(sum);

       }

   }

}).setParallelism(1).print().setParallelism(1);

env.execute();

// We should see 1 followed by 3 as output

However, such an operator cannot be parallelized, since the order of Values
and Barriers matters. That’s why I need to set parallelism to 1 above. Is
there a way to rewrite this to exploit parallelism?

(Another reason to set parallelism to 1 above is that I’m assuming there is
a single instance of the FlatMapFunction. A proper implementation would
take more care in using state. Feel free to comment on that as well.)


Best regards,


Filip Niksic

Reply via email to