Hi Theo, Isn't the solution I proposed exactly the solution you talk about? Read the stream sequentially, assign punctuated watermarks, keyBy to achieve parallelism.
Perhaps you're reading too much into my question. When I sent the first email, I didn't even know about punctuated watermarks. Dealing with a sequential stream that cannot be read sequentially was way beyond what I had in mind. :) Filip On Thu, Oct 10, 2019 at 7:55 AM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi Filip, > > My point was not about the computation of the "maximum". My point was: You > could hopefully read the stream sequentially and just assign punctuated > watermarks to it. Once you have assigned the watermarks properly (And > before you do your expensive computatation, like in this case parsing the > entire event and building the sum), you could tell flink to repartition / > key the data and shuffle it to the worker tasks in the network, so that the > downstream operations are performed in parallel. Flink will afaik then take > care of dealing with the watmark internally and everything is fine. > I think it is a rare usecase that you have a sequential stream which can > not be simply read sequentally. If its such a large stream, that you can't > do on a single host: "Read, extract special event, shuffle to the network > to other tasks", you probably have a larger issue and need to rethink on > the source level already, e.g. change the method serialization to something > which has a really lightweight parsing for finding the special events or > such. > > Best regards > Theo > > ------------------------------ > *Von: *"Filip Niksic" <fnik...@seas.upenn.edu> > *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de> > *CC: *"user" <user@flink.apache.org> > *Gesendet: *Donnerstag, 10. Oktober 2019 00:08:38 > *Betreff: *Re: [QUESTION] How to parallelize with explicit punctuation in > Flink? > > Hi Theo, > > It is a single sequential stream. > > If I read your response correctly, you are arguing that summing a bunch of > numbers is not much more computationally intensive than assigning > timestamps to those numbers, so if the latter has to be done sequentially > anyway, then why should the former be done in parallel? To that I can only > say that the example I gave is intentionally simple in order to make the > problem conceptually clean. By understanding the conceptually clean version > of the problem, we also gain insight into messier realistic versions where > the operations we want to parallelize may be much more computationally > intensive. > > Filip > > > > On Wed, Oct 9, 2019 at 1:28 PM theo.diefent...@scoop-software.de < > theo.diefent...@scoop-software.de> wrote: > >> Hi Filip, I don't really understand your problem here. >> Do you have a source with a single sequential stream, where from time to >> time, there is a barrier element? Or do you have a source like Kafka with >> multiple partitions? >> If you have case 2 with multiple partitions, what exactly do you mean by >> "order matters"? Will each partition have its own barrier? Or do you have >> just one barrier for all partitions? In that case, you will naturally have >> an ordering problem if your events itself contain no time data. >> If you have a "sequential source" why do you need parallelism? Won't it >> work out to read that partition data in one task (possibly skipping >> deserialization as much as possible to only recognize barrier events) and >> then add a downstream task with higher parallelism doing the full >> deserialization and other work? >> Best regardsTheo >> -------- Ursprüngliche Nachricht -------- >> Betreff: Re: [QUESTION] How to parallelize with explicit punctuation in >> Flink? >> Von: Yun Gao >> An: Filip Niksic ,user >> Cc: Chesnay Schepler >> >> >> Hi Filip, >> >> As a whole, I also think to increase the parallelism of the >> reduce to more than 1, we should use a parallel window to compute the >> partial sum and then sum the partial sum with WindowAll. >> >> For the assignTimestampAndWatermarks, From my side I think the >> current usage should be OK and it works the same to the other operators. >> Besides, for the keyBy Partitioner, I think "% PARALLELISM" is not >> necessary and Flink will take care of the parallelism. In other words, I >> think you can use .keyBy(x -> x.getId()) directly. >> >> Best, >> Yun >> >> >> ------------------------------------------------------------------ >> From:Filip Niksic <fnik...@seas.upenn.edu> >> Send Time:2019 Oct. 9 (Wed.) 12:21 >> To:user <user@flink.apache.org> >> Cc:Yun Gao <yungao...@aliyun.com>; Chesnay Schepler <ches...@apache.org> >> Subject:Re: [QUESTION] How to parallelize with explicit punctuation in >> Flink? >> >> Here is the solution I currently have. It turned out to be more >> complicated than I expected. It would be great if a more experienced Flink >> user could comment and point out the shortcomings. And if you have other >> ideas for achieving the same thing, let me know! >> >> Let's start like in the original email, except now we set the time >> characteristic to EventTime and parallelism to a constant named PARALLELISM. >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> final int PARALLELISM = 2; >> env.setParallelism(PARALLELISM); >> >> DataStream<DataItem> stream = env.fromElements(DataItem.class, >> new Value(1), new Barrier(), new Value(3), new Value(-1), new >> Barrier()); >> >> The first step is to use a punctuation-based timestamp-and-watermark >> assigner as follows. We keep track of the number of barriers in the stream. >> We assign a timestamp n to the n-th barrier and all the values that >> immediately precede it, and we emit a watermark with timestamp n on the >> n-th barrier. This will allow us to define 1 millisecond tumbling windows >> that precisely capture the values between two barriers. >> >> DataStream<DataItem> timedStream = >> stream.assignTimestampsAndWatermarks(new >> AssignerWithPunctuatedWatermarks<DataItem>() { >> private long barrierCount = 0; >> >> @Override >> public long extractTimestamp(DataItem item, long previousTimestamp) { >> return barrierCount; >> } >> >> @Nullable >> @Override >> public Watermark checkAndGetNextWatermark(DataItem item, long >> extractedTimestamp) { >> if (item instanceof Barrier) { >> barrierCount++; >> return new Watermark(extractedTimestamp); >> } >> return null; >> } >> }); >> >> In the test input stream, the first value and barrier get a timestamp 0, >> and the next two values and the final barrier get a timestamp 1. Two >> watermarks with timestamps 0 and 1 are emitted. >> >> To achieve parallelization, we partition the values by artificially >> generated keys. A value's key is based on its position in the stream, so we >> first wrap the values into a type that contains this information. >> >> class ValueWithId { >> private final int val; >> private final long id; >> >> public ValueWithId(int val, long id) { >> this.val = val; >> this.id = id; >> } >> public int getVal() { return val; } >> public long getId() { return id; } >> } >> >> Here is the mapping. At the same time we can drop the barriers, since we >> no longer need them. Note that we need to explicitly set the mapping >> operator's parallelism to 1, since the operator is stateful. >> >> DataStream<ValueWithId> wrappedStream = >> timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() { >> private long count = 0L; >> >> @Override >> public void flatMap(DataItem item, Collector<ValueWithId> collector) >> throws Exception { >> if (item instanceof Value) { >> int val = ((Value) item).getVal(); >> collector.collect(new ValueWithId(val, count++)); >> } >> } >> }).setParallelism(1); >> >> Now we're ready to do the key-based partitioning. A value's key is its id >> as assigned above modulo PARALLELISM. We follow the partitioning by >> splitting the stream into 1 millisecond tumbling windows. Then we simply >> aggregate the partial sums, first for each key separately (and importantly, >> in parallel), and then for each window. >> >> DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() % >> PARALLELISM) >> .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS)) >> .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() >> { >> @Override >> public Integer createAccumulator() { return 0; } >> >> @Override >> public Integer add(ValueWithId valueWithId, Integer acc) { >> return acc + valueWithId.getVal(); } >> >> @Override >> public Integer getResult(Integer acc) { return acc; } >> >> @Override >> public Integer merge(Integer acc1, Integer acc2) { return >> acc1 + acc2; } >> }) >> .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS)) >> .reduce((x, y) -> x + y); >> >> Finally, in the original problem I asked for cumulative sums since the >> start of the stream, so we perform the last set of transformations to >> achieve that. >> >> DataStream<Integer> cumulativeSums = partialSums >> .windowAll(GlobalWindows.create()) >> .trigger(CountTrigger.of(1)) >> .reduce((x, y) -> x + y); >> cumulativeSums.print().setParallelism(1); >> env.execute(); >> // We should see 1 followed by 3 as output >> >> I am not completely sure if my usage of state in the >> timestamp-and-watermark assigner and the mapper is correct. Is it possible >> for Flink to duplicate the assigner, move it around and somehow mess up the >> timestamps? Likewise, is it possible for things to go wrong with the mapper? >> >> Another concern I have is that my key-based partitions depend on the >> constant PARALLELISM. Ideally, the program should be flexible about the >> parallelism that happens to be available during runtime. >> >> Finally, if anyone notices that I am in any part reinventing the wheel >> and that Flink already has a feature implementing some of the above, or >> that something can be done more elegantly, let me know! >> >> Best regards, >> >> Filip >> >> >> On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <fnik...@seas.upenn.edu> >> wrote: >> >> Hi Chesnay, >> >> Thanks for the reply. While your solution ultimately does use multiple >> partitions, from what I can tell the underlying processing is still >> sequential. Imagine a stream where barriers are quite rare, say a million >> values is followed by a barrier. Then these million values all end up at >> the same partition and are added up sequentially, and while they are being >> processed, the other partitions are waiting for their turn. A truly >> parallel solution would partition the million values, process each >> partition in parallel to get the partial sums, and on each barrier >> aggregate the partial sums into a total sum. >> >> Filip >> >> >> On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> In other words, you need a way to partition the stream such that a series >> of items followed by a barrier are never interrupted. >> >> I'm wondering whether you could just apply DataStream#partitionCustom to >> your source: >> public static class BarrierPartitioner implements Partitioner<DataItem> { >> >> private int currentPartition = 0; >> @Override >> public int partition(DataItem key, int numPartitions) { >> if (key instanceof Barrier) { >> int partitionToReturn = currentPartition; >> currentPartition = (currentPartition + 1) % numPartitions; >> return partitionToReturn; >> } else { >> return currentPartition; >> } >> } >> } >> >> DataStream<DataItem> stream = ...; >> DataStream<DataItem> partitionedStream = stream.partitionCustom(new >> BarrierPartitioner(), item -> item); >> >> On 08/10/2019 14:55, Filip Niksic wrote: >> Hi Yun, >> >> The behavior with increased parallelism should be the same as with no >> parallelism. In other words, for the input from the previous email, the >> output should always be 1, 3, regardless of parallelism. Operationally, the >> partial sums maintained in each subtask should somehow be aggregated before >> they are output. >> >> To answer the second question, I know that watermarks provide the same >> functionality. Is there some way to convert the input with explicit >> punctuation into one with watermarks? I see there is an interface called >> AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not >> sure how this assigner would be used. For example, it could maintain the >> number of previously seen Barriers and assign this number as a watermark to >> each Value, but then this number becomes the state that needs to be shared >> between multiple substreams. Or perhaps the Barriers can somehow be >> duplicated and sent to each substream? Alternatively, is there some notion >> of event-based windows that would be triggered by specific user-defined >> elements in the stream? In such mechanism perhaps the watermarks would be >> used internally, but they would not be explicitly exposed to the user? >> >> Best regards, >> >> Filip >> >> >> On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote: >> >> Hi Filip, >> I have one question on the problem: what is the expected >> behavior when the parallelism of the FlatMapFunction is increased to more >> than 1? Should each subtask maintains the partial sum of all values >> received, and whenever the barrier is received, then it just outputs the >> partial sum of the received value ? >> >> Another question is that I think in Flink the watermark >> mechanism has provided the functionality similar to punctuation, therefore >> is it possible to implement the same logic with the Flink Window directly? >> Best, >> Yun >> >> ------------------------------------------------------------------ >> From:Filip Niksic <fnik...@seas.upenn.edu> >> Send Time:2019 Oct. 8 (Tue.) 08:56 >> To:user <user@flink.apache.org> >> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink? >> >> Hi all, >> What would be a natural way to implement a parallel version of the >> following Flink program? >> Suppose I have a stream of items of type DataItem with two concrete >> implementations of DataItem: Value and Barrier. Let’s say that Value holds >> an integer value, and Barrier acts as explicit punctuation. >> public interface DataItem {} >> public class Value implements DataItem { >> private final int val; >> public Value(int val) { this.val = val; } >> public int getVal() { return val; } >> } >> public class Barrier implements DataItem {} >> The program should maintain a sum of values seen since the beginning of >> the stream. On each Barrier, the program should output the sum seen so far. >> An obvious way to implement this would be with a FlatMapFunction, >> maintaining the sum as state and emitting it on each Barrier. >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<DataItem> stream = env.fromElements(DataItem.class, >> new Value(1), new Barrier(), new Value(3), new Value(-1), new >> Barrier()); >> stream.flatMap(new FlatMapFunction<DataItem, Integer>() { >> private int sum = 0; >> @Override >> public void flatMap(DataItem dataItem, Collector<Integer> collector) >> throws Exception { >> if (dataItem instanceof Value) { >> sum += ((Value) dataItem).getVal(); >> } else { >> collector.collect(sum); >> } >> } >> }).setParallelism(1).print().setParallelism(1); >> env.execute(); >> // We should see 1 followed by 3 as output >> However, such an operator cannot be parallelized, since the order of >> Values and Barriers matters. That’s why I need to set parallelism to 1 >> above. Is there a way to rewrite this to exploit parallelism? >> (Another reason to set parallelism to 1 above is that I’m assuming there >> is a single instance of the FlatMapFunction. A proper implementation would >> take more care in using state. Feel free to comment on that as well.) >> >> Best regards, >> >> Filip Niksic >> >> >> >>