Hi Theo,

Isn't the solution I proposed exactly the solution you talk about? Read the
stream sequentially, assign punctuated watermarks, keyBy to achieve
parallelism.

Perhaps you're reading too much into my question. When I sent the first
email, I didn't even know about punctuated watermarks. Dealing with a
sequential stream that cannot be read sequentially was way beyond what I
had in mind. :)

Filip



On Thu, Oct 10, 2019 at 7:55 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Filip,
>
> My point was not about the computation of the "maximum". My point was: You
> could hopefully read the stream sequentially and just assign punctuated
> watermarks to it. Once you have assigned the watermarks properly (And
> before you do your expensive computatation, like in this case parsing the
> entire event and building the sum), you could tell flink to repartition /
> key the data and shuffle it to the worker tasks in the network, so that the
> downstream operations are performed in parallel. Flink will afaik then take
> care of dealing with the watmark internally and everything is fine.
> I think it is a rare usecase that you have a sequential stream which can
> not be simply read sequentally. If its such a large stream, that you can't
> do on a single host: "Read, extract special event, shuffle to the network
> to other tasks", you probably have a larger issue and need to rethink on
> the source level already, e.g. change the method serialization to something
> which has a really lightweight parsing for finding the special events or
> such.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Filip Niksic" <fnik...@seas.upenn.edu>
> *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de>
> *CC: *"user" <user@flink.apache.org>
> *Gesendet: *Donnerstag, 10. Oktober 2019 00:08:38
> *Betreff: *Re: [QUESTION] How to parallelize with explicit punctuation in
> Flink?
>
> Hi Theo,
>
> It is a single sequential stream.
>
> If I read your response correctly, you are arguing that summing a bunch of
> numbers is not much more computationally intensive than assigning
> timestamps to those numbers, so if the latter has to be done sequentially
> anyway, then why should the former be done in parallel? To that I can only
> say that the example I gave is intentionally simple in order to make the
> problem conceptually clean. By understanding the conceptually clean version
> of the problem, we also gain insight into messier realistic versions where
> the operations we want to parallelize may be much more computationally
> intensive.
>
> Filip
>
>
>
> On Wed, Oct 9, 2019 at 1:28 PM theo.diefent...@scoop-software.de <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Filip, I don't really understand your problem here.
>> Do you have a source with a single sequential stream, where from time to
>> time, there is a barrier element? Or do you have a source like Kafka with
>> multiple partitions?
>> If you have case 2 with multiple partitions, what exactly do you mean by
>> "order matters"? Will each partition have its own barrier? Or do you have
>> just one barrier for all partitions? In that case, you will naturally have
>> an ordering problem if your events itself contain no time data.
>> If you have a "sequential source" why do you need parallelism? Won't it
>> work out to read that partition data in one task (possibly skipping
>> deserialization as much as possible to only recognize barrier events) and
>> then add a downstream task with higher parallelism doing the full
>> deserialization and other work?
>> Best regardsTheo
>> -------- Ursprüngliche Nachricht --------
>> Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in
>> Flink?
>> Von: Yun Gao
>> An: Filip Niksic ,user
>> Cc: Chesnay Schepler
>>
>>
>>       Hi Filip,
>>
>>          As a whole, I also think to increase the parallelism of the
>> reduce to more than 1, we should use a parallel window to compute the
>> partial sum and then sum the partial sum with WindowAll.
>>
>>         For the assignTimestampAndWatermarks, From my side I think the
>> current usage should be OK and it works the same to the other operators.
>> Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not
>> necessary and Flink will take care of the parallelism. In other words, I
>> think you can use .keyBy(x -> x.getId()) directly.
>>
>>     Best,
>>     Yun
>>
>>
>> ------------------------------------------------------------------
>> From:Filip Niksic <fnik...@seas.upenn.edu>
>> Send Time:2019 Oct. 9 (Wed.) 12:21
>> To:user <user@flink.apache.org>
>> Cc:Yun Gao <yungao...@aliyun.com>; Chesnay Schepler <ches...@apache.org>
>> Subject:Re: [QUESTION] How to parallelize with explicit punctuation in
>> Flink?
>>
>> Here is the solution I currently have. It turned out to be more
>> complicated than I expected. It would be great if a more experienced Flink
>> user could comment and point out the shortcomings. And if you have other
>> ideas for achieving the same thing, let me know!
>>
>> Let's start like in the original email, except now we set the time
>> characteristic to EventTime and parallelism to a constant named PARALLELISM.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> final int PARALLELISM = 2;
>> env.setParallelism(PARALLELISM);
>>
>> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>>         new Value(1), new Barrier(), new Value(3), new Value(-1), new
>> Barrier());
>>
>> The first step is to use a punctuation-based timestamp-and-watermark
>> assigner as follows. We keep track of the number of barriers in the stream.
>> We assign a timestamp n to the n-th barrier and all the values that
>> immediately precede it, and we emit a watermark with timestamp n on the
>> n-th barrier. This will allow us to define 1 millisecond tumbling windows
>> that precisely capture the values between two barriers.
>>
>> DataStream<DataItem> timedStream =
>>         stream.assignTimestampsAndWatermarks(new
>> AssignerWithPunctuatedWatermarks<DataItem>() {
>>     private long barrierCount = 0;
>>
>>     @Override
>>     public long extractTimestamp(DataItem item, long previousTimestamp) {
>>         return barrierCount;
>>     }
>>
>>     @Nullable
>>     @Override
>>     public Watermark checkAndGetNextWatermark(DataItem item, long
>> extractedTimestamp) {
>>         if (item instanceof Barrier) {
>>             barrierCount++;
>>             return new Watermark(extractedTimestamp);
>>         }
>>         return null;
>>     }
>> });
>>
>> In the test input stream, the first value and barrier get a timestamp 0,
>> and the next two values and the final barrier get a timestamp 1. Two
>> watermarks with timestamps 0 and 1 are emitted.
>>
>> To achieve parallelization, we partition the values by artificially
>> generated keys. A value's key is based on its position in the stream, so we
>> first wrap the values into a type that contains this information.
>>
>> class ValueWithId {
>>     private final int val;
>>     private final long id;
>>
>>     public ValueWithId(int val, long id) {
>>         this.val = val;
>>         this.id = id;
>>     }
>>     public int getVal() { return val; }
>>     public long getId() { return id; }
>> }
>>
>> Here is the mapping. At the same time we can drop the barriers, since we
>> no longer need them. Note that we need to explicitly set the mapping
>> operator's parallelism to 1, since the operator is stateful.
>>
>> DataStream<ValueWithId> wrappedStream =
>>         timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
>>     private long count = 0L;
>>
>>     @Override
>>     public void flatMap(DataItem item, Collector<ValueWithId> collector)
>> throws Exception {
>>         if (item instanceof Value) {
>>             int val = ((Value) item).getVal();
>>             collector.collect(new ValueWithId(val, count++));
>>         }
>>     }
>> }).setParallelism(1);
>>
>> Now we're ready to do the key-based partitioning. A value's key is its id
>> as assigned above modulo PARALLELISM. We follow the partitioning by
>> splitting the stream into 1 millisecond tumbling windows. Then we simply
>> aggregate the partial sums, first for each key separately (and importantly,
>> in parallel), and then for each window.
>>
>> DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() %
>> PARALLELISM)
>>         .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
>>         .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>()
>> {
>>             @Override
>>             public Integer createAccumulator() { return 0; }
>>
>>             @Override
>>             public Integer add(ValueWithId valueWithId, Integer acc) {
>> return acc + valueWithId.getVal(); }
>>
>>             @Override
>>             public Integer getResult(Integer acc) { return acc; }
>>
>>             @Override
>>             public Integer merge(Integer acc1, Integer acc2) { return
>> acc1 + acc2; }
>>         })
>>         .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
>>         .reduce((x, y) -> x + y);
>>
>> Finally, in the original problem I asked for cumulative sums since the
>> start of the stream, so we perform the last set of transformations to
>> achieve that.
>>
>> DataStream<Integer> cumulativeSums = partialSums
>>         .windowAll(GlobalWindows.create())
>>         .trigger(CountTrigger.of(1))
>>         .reduce((x, y) -> x + y);
>> cumulativeSums.print().setParallelism(1);
>> env.execute();
>> // We should see 1 followed by 3 as output
>>
>> I am not completely sure if my usage of state in the
>> timestamp-and-watermark assigner and the mapper is correct. Is it possible
>> for Flink to duplicate the assigner, move it around and somehow mess up the
>> timestamps? Likewise, is it possible for things to go wrong with the mapper?
>>
>> Another concern I have is that my key-based partitions depend on the
>> constant PARALLELISM. Ideally, the program should be flexible about the
>> parallelism that happens to be available during runtime.
>>
>> Finally, if anyone notices that I am in any part reinventing the wheel
>> and that Flink already has a feature implementing some of the above, or
>> that something can be done more elegantly, let me know!
>>
>> Best regards,
>>
>> Filip
>>
>>
>> On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <fnik...@seas.upenn.edu>
>> wrote:
>>
>> Hi Chesnay,
>>
>> Thanks for the reply. While your solution ultimately does use multiple
>> partitions, from what I can tell the underlying processing is still
>> sequential. Imagine a stream where barriers are quite rare, say a million
>> values is followed by a barrier. Then these million values all end up at
>> the same partition and are added up sequentially, and while they are being
>> processed, the other partitions are waiting for their turn. A truly
>> parallel solution would partition the million values, process each
>> partition in parallel to get the partial sums, and on each barrier
>> aggregate the partial sums into a total sum.
>>
>> Filip
>>
>>
>> On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>> In other words, you need a way to partition the stream such that a series
>> of items followed by a barrier are never interrupted.
>>
>> I'm wondering whether you could just apply DataStream#partitionCustom to
>> your source:
>> public static class BarrierPartitioner implements Partitioner<DataItem> {
>>
>>    private int currentPartition = 0;
>>    @Override
>>    public int partition(DataItem key, int numPartitions) {
>>       if (key instanceof Barrier) {
>>          int partitionToReturn = currentPartition;
>>          currentPartition = (currentPartition + 1) % numPartitions;
>>          return partitionToReturn;
>>       } else {
>>          return currentPartition;
>>       }
>>    }
>> }
>>
>> DataStream<DataItem> stream = ...;
>> DataStream<DataItem> partitionedStream = stream.partitionCustom(new
>> BarrierPartitioner(), item -> item);
>>
>> On 08/10/2019 14:55, Filip Niksic wrote:
>> Hi Yun,
>>
>> The behavior with increased parallelism should be the same as with no
>> parallelism. In other words, for the input from the previous email, the
>> output should always be 1, 3, regardless of parallelism. Operationally, the
>> partial sums maintained in each subtask should somehow be aggregated before
>> they are output.
>>
>> To answer the second question, I know that watermarks provide the same
>> functionality. Is there some way to convert the input with explicit
>> punctuation into one with watermarks? I see there is an interface called
>> AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not
>> sure how this assigner would be used. For example, it could maintain the
>> number of previously seen Barriers and assign this number as a watermark to
>> each Value, but then this number becomes the state that needs to be shared
>> between multiple substreams. Or perhaps the Barriers can somehow be
>> duplicated and sent to each substream? Alternatively, is there some notion
>> of event-based windows that would be triggered by specific user-defined
>> elements in the stream? In such mechanism perhaps the watermarks would be
>> used internally, but they would not be explicitly exposed to the user?
>>
>> Best regards,
>>
>> Filip
>>
>>
>> On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>        Hi Filip,
>>            I have one question on the problem: what is the expected
>> behavior when the  parallelism of the FlatMapFunction is increased to more
>> than 1? Should each subtask maintains the partial sum of all values
>> received, and whenever the barrier is received, then it just outputs the
>> partial sum of the received value ?
>>
>>           Another question is that I think in Flink the watermark
>> mechanism has provided the functionality similar to punctuation,  therefore
>> is it possible to implement the same logic with the Flink Window directly?
>>     Best,
>>     Yun
>>
>> ------------------------------------------------------------------
>> From:Filip Niksic <fnik...@seas.upenn.edu>
>> Send Time:2019 Oct. 8 (Tue.) 08:56
>> To:user <user@flink.apache.org>
>> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
>>
>> Hi all,
>> What would be a natural way to implement a parallel version of the
>> following Flink program?
>> Suppose I have a stream of items of type DataItem with two concrete
>> implementations of DataItem: Value and Barrier. Let’s say that Value holds
>> an integer value, and Barrier acts as explicit punctuation.
>> public interface DataItem {}
>> public class Value implements DataItem {
>>  private final int val;
>>  public Value(int val) { this.val = val; }
>>  public int getVal() { return val; }
>> }
>> public class Barrier implements DataItem {}
>> The program should maintain a sum of values seen since the beginning of
>> the stream. On each Barrier, the program should output the sum seen so far.
>> An obvious way to implement this would be with a FlatMapFunction,
>> maintaining the sum as state and emitting it on each Barrier.
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>>  new Value(1), new Barrier(), new Value(3), new Value(-1), new
>> Barrier());
>> stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
>>  private int sum = 0;
>>  @Override
>>  public void flatMap(DataItem dataItem, Collector<Integer> collector)
>> throws Exception {
>>  if (dataItem instanceof Value) {
>>  sum += ((Value) dataItem).getVal();
>>        } else {
>>            collector.collect(sum);
>>        }
>>    }
>> }).setParallelism(1).print().setParallelism(1);
>> env.execute();
>> // We should see 1 followed by 3 as output
>> However, such an operator cannot be parallelized, since the order of
>> Values and Barriers matters. That’s why I need to set parallelism to 1
>> above. Is there a way to rewrite this to exploit parallelism?
>> (Another reason to set parallelism to 1 above is that I’m assuming there
>> is a single instance of the FlatMapFunction. A proper implementation would
>> take more care in using state. Feel free to comment on that as well.)
>>
>> Best regards,
>>
>> Filip Niksic
>>
>>
>>
>>

Reply via email to