Hi Yun,

Thanks. Apropos the keyBy partitioner, I first tried it directly with
.keyBy(x -> x.getId()). It is true that the items get evenly distributed
among the available task slots, but since there is a single item per key,
the aggregations that should be done in parallel become trivial, and the
real summation happens in the reduce operator after windowAll. So at least
the way things are currently set up, it seems that "%PARALLELISM" is
necessary in order to effectively gain parallelism.

Filip



On Wed, Oct 9, 2019 at 11:10 AM Yun Gao <yungao...@aliyun.com> wrote:

>       Hi Filip,
>
>          As a whole, I also think to increase the parallelism of the
> reduce to more than 1, we should use a parallel window to compute the
> partial sum and then sum the partial sum with WindowAll.
>
>         For the assignTimestampAndWatermarks, From my side I think the
> current usage should be OK and it works the same to the other operators.
> Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not
> necessary and Flink will take care of the parallelism. In other words, I
> think you can use .keyBy(x -> x.getId()) directly.
>
>     Best,
>     Yun
>
> ------------------------------------------------------------------
> From:Filip Niksic <fnik...@seas.upenn.edu>
> Send Time:2019 Oct. 9 (Wed.) 12:21
> To:user <user@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>; Chesnay Schepler <ches...@apache.org>
> Subject:Re: [QUESTION] How to parallelize with explicit punctuation in
> Flink?
>
> Here is the solution I currently have. It turned out to be more
> complicated than I expected. It would be great if a more experienced Flink
> user could comment and point out the shortcomings. And if you have other
> ideas for achieving the same thing, let me know!
>
> Let's start like in the original email, except now we set the time
> characteristic to EventTime and parallelism to a constant named PARALLELISM.
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> final int PARALLELISM = 2;
> env.setParallelism(PARALLELISM);
>
> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>         new Value(1), new Barrier(), new Value(3), new Value(-1), new 
> Barrier());
>
>
> The first step is to use a punctuation-based timestamp-and-watermark
> assigner as follows. We keep track of the number of barriers in the stream.
> We assign a timestamp n to the n-th barrier and all the values that
> immediately precede it, and we emit a watermark with timestamp n on the
> n-th barrier. This will allow us to define 1 millisecond tumbling windows
> that precisely capture the values between two barriers.
>
> DataStream<DataItem> timedStream =
>         stream.assignTimestampsAndWatermarks(new 
> AssignerWithPunctuatedWatermarks<DataItem>() {
>     private long barrierCount = 0;
>
>     @Override
>     public long extractTimestamp(DataItem item, long previousTimestamp) {
>         return barrierCount;
>     }
>
>     @Nullable
>     @Override
>     public Watermark checkAndGetNextWatermark(DataItem item, long 
> extractedTimestamp) {
>         if (item instanceof Barrier) {
>             barrierCount++;
>             return new Watermark(extractedTimestamp);
>         }
>         return null;
>     }
> });
>
>
> In the test input stream, the first value and barrier get a timestamp 0,
> and the next two values and the final barrier get a timestamp 1. Two
> watermarks with timestamps 0 and 1 are emitted.
>
> To achieve parallelization, we partition the values by artificially
> generated keys. A value's key is based on its position in the stream, so we
> first wrap the values into a type that contains this information.
>
> class ValueWithId {
>     private final int val;
>     private final long id;
>
>     public ValueWithId(int val, long id) {
>         this.val = val;
>         this.id = id;
>     }
>     public int getVal() { return val; }
>     public long getId() { return id; }
> }
>
>
> Here is the mapping. At the same time we can drop the barriers, since we
> no longer need them. Note that we need to explicitly set the mapping
> operator's parallelism to 1, since the operator is stateful.
>
> DataStream<ValueWithId> wrappedStream =
>         timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
>     private long count = 0L;
>
>     @Override
>     public void flatMap(DataItem item, Collector<ValueWithId> collector) 
> throws Exception {
>         if (item instanceof Value) {
>             int val = ((Value) item).getVal();
>             collector.collect(new ValueWithId(val, count++));
>         }
>     }
> }).setParallelism(1);
>
>
> Now we're ready to do the key-based partitioning. A value's key is its id
> as assigned above modulo PARALLELISM. We follow the partitioning by
> splitting the stream into 1 millisecond tumbling windows. Then we simply
> aggregate the partial sums, first for each key separately (and importantly,
> in parallel), and then for each window.
>
> DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % 
> PARALLELISM)
>         .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
>         .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
>             @Override
>             public Integer createAccumulator() { return 0; }
>
>             @Override
>             public Integer add(ValueWithId valueWithId, Integer acc) { return 
> acc + valueWithId.getVal(); }
>
>             @Override
>             public Integer getResult(Integer acc) { return acc; }
>
>             @Override
>             public Integer merge(Integer acc1, Integer acc2) { return acc1 + 
> acc2; }
>         })
>         .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
>         .reduce((x, y) -> x + y);
>
>
> Finally, in the original problem I asked for cumulative sums since the
> start of the stream, so we perform the last set of transformations to
> achieve that.
>
> DataStream<Integer> cumulativeSums = partialSums
>         .windowAll(GlobalWindows.create())
>         .trigger(CountTrigger.of(1))
>         .reduce((x, y) -> x + y);
> cumulativeSums.print().setParallelism(1);
> env.execute();
> // We should see 1 followed by 3 as output
>
>
> I am not completely sure if my usage of state in the
> timestamp-and-watermark assigner and the mapper is correct. Is it possible
> for Flink to duplicate the assigner, move it around and somehow mess up the
> timestamps? Likewise, is it possible for things to go wrong with the mapper?
>
> Another concern I have is that my key-based partitions depend on the
> constant PARALLELISM. Ideally, the program should be flexible about the
> parallelism that happens to be available during runtime.
>
> Finally, if anyone notices that I am in any part reinventing the wheel and
> that Flink already has a feature implementing some of the above, or that
> something can be done more elegantly, let me know!
>
> Best regards,
>
> Filip
>
>
> On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <fnik...@seas.upenn.edu>
> wrote:
> Hi Chesnay,
>
> Thanks for the reply. While your solution ultimately does use multiple
> partitions, from what I can tell the underlying processing is still
> sequential. Imagine a stream where barriers are quite rare, say a million
> values is followed by a barrier. Then these million values all end up at
> the same partition and are added up sequentially, and while they are being
> processed, the other partitions are waiting for their turn. A truly
> parallel solution would partition the million values, process each
> partition in parallel to get the partial sums, and on each barrier
> aggregate the partial sums into a total sum.
>
> Filip
>
>
> On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org>
> wrote:
> In other words, you need a way to partition the stream such that a series
> of items followed by a barrier are never interrupted.
>
> I'm wondering whether you could just apply DataStream#partitionCustom to
> your source:
>
> public static class BarrierPartitioner implements Partitioner<DataItem> {
>
>    private int currentPartition = 0;   @Override   public int 
> partition(DataItem key, int numPartitions) {
>       if (key instanceof Barrier) {
>          int partitionToReturn = currentPartition;         currentPartition = 
> (currentPartition + 1) % numPartitions;         return partitionToReturn;     
>  } else {
>          return currentPartition;      }
>    }
> }
>
> DataStream<DataItem> stream = ...;DataStream<DataItem> partitionedStream = 
> stream.partitionCustom(new BarrierPartitioner(), item -> item);
>
>
> On 08/10/2019 14:55, Filip Niksic wrote:
> Hi Yun,
>
> The behavior with increased parallelism should be the same as with no
> parallelism. In other words, for the input from the previous email, the
> output should always be 1, 3, regardless of parallelism. Operationally, the
> partial sums maintained in each subtask should somehow be aggregated before
> they are output.
>
> To answer the second question, I know that watermarks provide the same
> functionality. Is there some way to convert the input with explicit
> punctuation into one with watermarks? I see there is an interface called
> AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not
> sure how this assigner would be used. For example, it could maintain the
> number of previously seen Barriers and assign this number as a watermark to
> each Value, but then this number becomes the state that needs to be shared
> between multiple substreams. Or perhaps the Barriers can somehow be
> duplicated and sent to each substream? Alternatively, is there some notion
> of event-based windows that would be triggered by specific user-defined
> elements in the stream? In such mechanism perhaps the watermarks would be
> used internally, but they would not be explicitly exposed to the user?
>
> Best regards,
>
> Filip
>
>
> On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:
>
>       Hi Filip,
>
>            I have one question on the problem: what is the expected
> behavior when the  parallelism of the FlatMapFunction is increased to more
> than 1? Should each subtask maintains the partial sum of all values
> received, and whenever the barrier is received, then it just outputs the
> partial sum of the received value ?
>
>           Another question is that I think in Flink the watermark
> mechanism has provided the functionality similar to punctuation,  therefore
> is it possible to implement the same logic with the Flink Window directly?
>
>     Best,
>     Yun
>
> ------------------------------------------------------------------
> From:Filip Niksic <fnik...@seas.upenn.edu>
> Send Time:2019 Oct. 8 (Tue.) 08:56
> To:user <user@flink.apache.org>
> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
>
> Hi all,
>
> What would be a natural way to implement a parallel version of the
> following Flink program?
>
> Suppose I have a stream of items of type DataItem with two concrete
> implementations of DataItem: Value and Barrier. Let’s say that Value holds
> an integer value, and Barrier acts as explicit punctuation.
>
> public interface DataItem {}
>
> public class Value implements DataItem {
>
>    private final int val;
>
>    public Value(int val) { this.val = val; }
>
>    public int getVal() { return val; }
>
> }
>
> public class Barrier implements DataItem {}
>
> The program should maintain a sum of values seen since the beginning of
> the stream. On each Barrier, the program should output the sum seen so far.
>
> An obvious way to implement this would be with a FlatMapFunction,
> maintaining the sum as state and emitting it on each Barrier.
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>
>        new Value(1), new Barrier(), new Value(3), new Value(-1), new
> Barrier());
>
> stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
>
>    private int sum = 0;
>
>    @Override
>
>    public void flatMap(DataItem dataItem, Collector<Integer> collector) throws
> Exception {
>
>        if (dataItem instanceof Value) {
>
>            sum += ((Value) dataItem).getVal();
>
>        } else {
>
>            collector.collect(sum);
>
>        }
>
>    }
>
> }).setParallelism(1).print().setParallelism(1);
>
> env.execute();
>
> // We should see 1 followed by 3 as output
>
> However, such an operator cannot be parallelized, since the order of
> Values and Barriers matters. That’s why I need to set parallelism to 1
> above. Is there a way to rewrite this to exploit parallelism?
>
> (Another reason to set parallelism to 1 above is that I’m assuming there
> is a single instance of the FlatMapFunction. A proper implementation would
> take more care in using state. Feel free to comment on that as well.)
>
>
> Best regards,
>
>
> Filip Niksic
>
>
>
>

Reply via email to