Hi Nikos, yes, the hash function is not only used for partitioning but also to organize the key-partitioned state. My intuition is that the AbstractStreamOperator approach would be easier to realize, because you don't need to worry about side effects of changing Flink internals.
Best, Fabian 2017-01-25 18:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos < kat...@cs.pitt.edu>: > Hello Fabian, > > > > Thank you for your response and there is no need for apologies J . As I > mentioned in my previous email, my wording seemed confusing and it was only > expected that you had an incomplete picture of my goal. Again, thank you > for your help and your time. > > > > Moving on to my plan from this point on, I understand that I might have to > implement some custom components myself (I prefer conducting my research on > an actual system over regressing back to an awful simulation). To that end, > I thought of implementing my own KeyedStream<T> implementation that > provides the option of using a different StreamPartitioner<T> other than > the HashPartitioner<T>. This CustomKeyedStream<T> will be triggered by a > call to a custom method offered by DataStream<T> (let’s say) > customKeyBy(int... > fields, CustomPartitioner<T>) and it will work exactly the same as > DataStream<T>.keyBy(int... > fields), but with the only difference that it will receive a custom > partitioner instead of using the default hash partitioner. Do you think > that this plan is feasible? I am not completely sure on whether the > windowed key state be affected by the design in any way? > > > > In addition, I will consider your suggestion on extending the > AbstractStreamOperator and implementing the OneInputStreamOperator. It > looks like an easier way compared to the one I described above and I will > try to dive into its implementation details. > > > > Again, thank you very much for your help and your constructive comments. > > > > Kind Regards, > > > > Nikos R. Katsipoulakis, > > Department of Computer Science > > University of Pittsburgh > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Wednesday, January 25, 2017 12:28 PM > > *To:* user@flink.apache.org > *Subject:* Re: Custom Partitioning and windowing questions/concerns > > > > Hi Nikos, > > you are of course right. I forgot that ProcessFunction requires a > KeyedStream. Sorry for this advice. > > The problem is that you need need to implement some kind of time-based > function that emits partial counts every 10 seconds. > > AFAIK, the DataStream API does not offers built-in operator that gives you > this except for windows and ProcessFunction. > > You could try to implement your own operator by extending > AbstractStreamOperator and implementing the OneInputStreamOperator > interface. > This is a fairly low-level interface but gives you access to record > timestamps and watermarks. Actually, the DataStream operators are built on > this interface as well. > > A custom operator is applied by calling dataStream.transform(). > > Best, > > Fabian > > > > > > > > 2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos < > kat...@cs.pitt.edu>: > > Hello Fabian, > > > > First, I would like to thank you for your suggestion and the additional > information on determinism and partition policies. As I mentioned on my > initial email, I am new to Flink and every additional piece of advice makes > my “learning curve” less steep. In addition, I am aware that you (and > everyone else who follows this thread) might wonder why am I following this > unconventional path of performance partitioning, but, I have to inform you > that my use-case’s goal is of academic nature. > > > > Turning to your suggestion, I took some time and go over version’s > 1.2-SNAPSHOT code, and I read the online documentation on the Process > Function API which I found at: https://ci.apache.org/ > projects/flink/flink-docs-master/dev/stream/process_function.html > <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Fstream%2Fprocess_function.html&data=01%7C01%7Ckatsip%40cs.pitt.edu%7C590cb15521f144b6ad4e08d44547a7dd%7C9ef9f489e0a04eeb87cc3a526112fd0d%7C1&sdata=G%2FbyjqsSfQq7Rm11yLbPqxoxPwgN7AQm9gRW5vB8vSw%3D&reserved=0> > . From my understanding, the process() transformation can be applied only > on a KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to > use a custom partition algorithm, I would have to first make a call to > partitionCustom() > (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T> > -> KeyedStream<T>), and finally apply my first pre-aggregation step > (i.e., call to process()). Concretely, my code would turn to something > like the following: > > // Phase 1: parallel partial sum, with a parallelism of N (N > 1) > > DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream > > .partitionCustom(new CustomPartitioner(...)) // or > .rebalance() or .shuffle() > > *.keyBy(1)* > > .process(new CustomProcessFunction(..., > Time.seconds(10),...)) > > .sum(2).setParallelism(N); > > > > Unfortunately, you can understand that the above would be problematic for > two reasons: First, a call to keyBy() defeats the purpose of a custom > partitioner, because stream will be (ultimately) partitioned based on the > keys and not on my CustomPartitioner.selectChannels() method. Second, > using process() does not solve my problem, because the issue with my > use-case is to avoid calling keyBy(). If I could do that, then I might as > well call window()and not use the process API in the first place. To be > more precise, if I could use a KeyedStream<T>, then I could do the > following: > > > > // Phase 1: parallel partial sum, with a parallelism of N (N > 1) > > DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream > > .partitionCustom(new CustomPartitioner(...)) > > *.keyBy(1)* > > *.window*(TumblingEventTimeWindows.of(Time.seconds(10)) > > .sum(2).setParallelism(N); > > > > Therefore, I don’t think using a Process Function would solve my problem. > Am I understanding your suggestion correctly? If yes, I would be grateful > if you could explain to me in more detail. On top of that, after reading my > initial email again, I believe that the intentions for my use-case were not > quite clear. Please, do not hesitate to ask me for any clarifications. > > > > Again, thank you very much for your interest and your time. > > > > Kind Regards, > > > > Nikos R. Katsipoulakis, > > Department of Computer Science > > University of Pittsburgh > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Tuesday, January 24, 2017 5:15 AM > *To:* user@flink.apache.org > *Subject:* Re: Custom Partitioning and windowing questions/concerns > > > > Hi Nikos, > > Flink's windows require a KeyedStream because they use the keys to manage > their internal state (each in-progress window has some state that needs to > be persisted and checkpointed). > > Moreover, Flink's event-time window operators return a deterministic > result. In your use-case, the result of the pre-aggregation (phase 1) > should not deterministic because it would depend on the partitioning of the > input. > > I would suggest to implement the pre-aggregation not with a window but > with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be > release soon). > > ProcessFunction allows you to register timers which can be used to emit > results every 10 seconds. > > Hope this helps, > > Fabian > > > > > > 2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos < > kat...@cs.pitt.edu>: > > Hello all, > > > > Currently, I examine the effects of stream partitioning on performance for > simple state-full scenarios. > > > > My toy application for the rest of my question will be the following: A > stream of non-negative integers, each one annotated with a timestamp, and > the goal is to get the top-10 most frequent non-negative integers on > tumbling windows of 10 seconds. In other words, my input is a stream of > tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key > is the non-negative integer value, and timestamp is used to assign each > event to a window. The execution plan I am considering is to have a *first > phase (Phase 1)*, where the stream is partitioned and the partial > aggregations are processed in parallel (set parallelism to N > 1). > Afterwards, the *second phase (Phase 2)* involves gathering all partial > aggregations on a single node (set parallelism to 1), and calculate the > full aggregation for each key, order the keys based on windowed frequency > and outputs the top-10 keys for each window. > > > > As I mentioned earlier, my goal is to compare the performance of different > partitioning policies on this toy application. Initially, I want to compare > shuffle-grouping (round-robin) and hash-grouping and then move on to > different partitioning policies by using Flink’s CustomPartitioner API. > After reading Flink’s documentation, I managed to develop the toy > application using hash-partitioning. Below, I present the different parts > of my code: > > > > // Phase 0: input setup > > DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…) > > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple2<Long, Integer>>() { > > @Override > > public long extractAscendingTimestamp(Tuple2<Long, > Integer> event) { return event.f0; } > > }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, > Integer, Integer>(e.f0, e.f1, 1)); > > > > On Phase 0, I collect the input stream, from an in-memory list, define the > event timestamp which will be used for windowing, and extend each event > with a value of 1 for calculating the appearance of each number on every > window. Afterwards, for the parallel Phase 1, I use hash partitioning by > first using .keyBy() operation on the key of each tuple (i.e., field 1), > followed by a .window() operation, to assign each tuple on a different > window, and end with a .sum(). My code for (parallel) Phase 1 is the > following: > > > > // Phase 1: parallel partial sum, with a parallelism of N (N > 1) > > DataStream<Tuple3<Long, Integer, Integer> phaseOne = > stream.keyBy(1).window(TumblingEventTimeWindows.of( > Time.seconds(10)).sum(2).setParallelism(N); > > > > Moving on to Phase 2, to aggregate all partial results of a single window > in one operator for producing the full aggregation, ordering based on > frequency, and return the top-10 keys, I have the following: > > > > // Phase 2: serial full aggregation and ordering, with a parallelism of 1 > > DataStream<String> phaseTwo = phaseOne > > .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)) > > .apply(new AllWindowsFunction<Tuple3<Long, Integer, > Integer>, String, TimeWindow>() { > > @Override > > public void apply(TimeWindow window, > Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) > throws Exception { > > ... > > List<Integer> topTenValues = ...; > > StringBuilder strBuilder = new StringBuilder(); > > for (Integer t : topTenValues) > > strBuilder.append(Integer.toString(t) + “,”); > > out.collect(strBuilder.toString()); > > }); > > > > The previous code makes use of hash-partitioning for its parallel phase. > From what I understand, Flink allows the .window() operation only on a > KeyedStream. Furthermore, the .customPartition() method transforms a > DataStream to a DataStream (and the same is true for .shuffle() which > round-robins events). Therefore, *I am confused on how I can use a > shuffle policy with windows*. One Idea that came to me is to provide an > irrelevant field on the .keyBy() method, or define my own KeySelector<IN, > KEY> that will simulate shuffle grouping through key generation. > Unfortunately, I have two concerns regarding the previous alternatives: For > the keyBy() approach, I need to control the internal hashing mechanisms, > which entails cherry-picking fields on different workloads and performing > an exhaustive search on the behavior of different random fields (not > practical). For the KeySelector<IN, KEY>approach, I need to maintain > state among different calls of getKey(), which (as far as I know) is not > offered by the KeySelector<IN, KEY> interface and I do not want to rely > on external state that will lead to additional overhead. Therefore, *my > first question is how will I be able to effectively use round-robin > grouping with windows on my toy application?* > > > > The bigger point I am trying to address revolves around custom > partitioning policies and windows in general. My understanding is that the > benefit of a custom partitioning policy is to have the ability to control > the partitioning process based on a pre-defined set of resources (e.g., > partitions, task slots etc.). Hence, *I am confused on how I would be > able to use **partitionCustom() followed by **.window() on the (parallel) > phase one, to test the performance of different execution plans (i.e., > partitioning policies).* > > > > I apologize for the long question, but I believe that I had to provide > enough details for the points/questions I currently have (highlighted with > bold). Thank you very much for your time. > > > > Kind Regards, > > > > Nikos R. Katsipoulakis, > > Department of Computer Science > > University of Pittsburgh > > > > > > >