In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted.

I'm wondering whether you could just apply DataStream#partitionCustom to your source:

public static class BarrierPartitionerimplements Partitioner<DataItem> {

   private int currentPartition =0; @Override public int partition(DataItem 
key, int numPartitions) {
      if (keyinstanceof Barrier) {
         int partitionToReturn =currentPartition; currentPartition = 
(currentPartition +1) % numPartitions; return partitionToReturn; }else {
         return currentPartition; }
   }
}

DataStream<DataItem> stream = ...; DataStream<DataItem> partitionedStream = 
stream.partitionCustom(new BarrierPartitioner(), item -> item);


On 08/10/2019 14:55, Filip Niksic wrote:
Hi Yun,

The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated before they are output.

To answer the second question, I know that watermarks provide the same functionality. Is there some way to convert the input with explicit punctuation into one with watermarks? I see there is an interface called AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not sure how this assigner would be used. For example, it could maintain the number of previously seen Barriers and assign this number as a watermark to each Value, but then this number becomes the state that needs to be shared between multiple substreams. Or perhaps the Barriers can somehow be duplicated and sent to each substream? Alternatively, is there some notion of event-based windows that would be triggered by specific user-defined elements in the stream? In such mechanism perhaps the watermarks would be used internally, but they would not be explicitly exposed to the user?

Best regards,

Filip



On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> wrote:


          Hi Filip,
             I have one question on the problem: what is the expected
    behavior when the  parallelism of the FlatMapFunction is increased
    to more than 1? Should each subtask maintains the partial sum of
    all values received, and whenever the barrier is received, then it
    just outputs the partial sum of the received value ?

            Another question is that I think in Flink the watermark
    mechanism has provided the functionality similar to punctuation, 
    therefore is it possible to implement the same logic with the
    Flink Window directly?
      Best,
      Yun

        ------------------------------------------------------------------
        From:Filip Niksic <fnik...@seas.upenn.edu
        <mailto:fnik...@seas.upenn.edu>>
        Send Time:2019 Oct. 8 (Tue.) 08:56
        To:user <user@flink.apache.org <mailto:user@flink.apache.org>>
        Subject:[QUESTION] How to parallelize with explicit
        punctuation in Flink?

        Hi all,

        What would be a natural way to implement a parallel version of
        the following Flink program?

        Suppose I have a stream of items of type DataItem with two
        concrete implementations of DataItem: Value and Barrier. Let’s
        say that Value holds an integer value, and Barrier acts as
        explicit punctuation.

        public interface DataItem {}


        public class Value implements DataItem {

        private final int val;

        public Value(int val) { this.val = val; }

        public int getVal() { return val; }

        }


        public class Barrier implements DataItem {}

        The program should maintain a sum of values seen since the
        beginning of the stream. On each Barrier, the program should
        output the sum seen so far.

        An obvious way to implement this would be with a
        FlatMapFunction, maintaining the sum as state and emitting it
        on each Barrier.

        StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<DataItem> stream = env.fromElements(DataItem.class,

        new Value(1), new Barrier(), new Value(3), new Value(-1), new
        Barrier());

        stream.flatMap(new FlatMapFunction<DataItem, Integer>() {

        private int sum = 0;

        @Override

        public void flatMap(DataItem dataItem, Collector<Integer>
        collector) throws Exception {

        if (dataItem instanceof Value) {

        sum += ((Value) dataItem).getVal();

               } else {

                   collector.collect(sum);

               }

           }

        }).setParallelism(1).print().setParallelism(1);

        env.execute();

        // We should see 1 followed by 3 as output

        However, such an operator cannot be parallelized, since the
        order of Values and Barriers matters. That’s why I need to set
        parallelism to 1 above. Is there a way to rewrite this to
        exploit parallelism?

        (Another reason to set parallelism to 1 above is that I’m
        assuming there is a single instance of the FlatMapFunction. A
        proper implementation would take more care in using state.
        Feel free to comment on that as well.)


        Best regards,


        FilipNiksic



Reply via email to