Hello Fabian,

Thank you for your response and there is no need for apologies ☺ . As I 
mentioned in my previous email, my wording seemed confusing and it was only 
expected that you had an incomplete picture of my goal. Again, thank you for 
your help and your time.

Moving on to my plan from this point on, I understand that I might have to 
implement some custom components myself (I prefer conducting my research on an 
actual system over regressing back to an awful simulation). To that end, I 
thought of implementing my own KeyedStream<T> implementation that provides the 
option of using a different StreamPartitioner<T> other than the 
HashPartitioner<T>. This CustomKeyedStream<T> will be triggered by a call to a 
custom method offered by DataStream<T> (let’s say) customKeyBy(int... fields, 
CustomPartitioner<T>) and it will work exactly the same as 
DataStream<T>.keyBy(int... fields), but with the only difference that it will 
receive a custom partitioner instead of using the default hash partitioner. Do 
you think that this plan is feasible? I am not completely sure on whether the 
windowed key state be affected by the design in any way?


In addition, I will consider your suggestion on extending the 
AbstractStreamOperator and implementing the OneInputStreamOperator. It looks 
like an easier way compared to the one I described above and I will try to dive 
into its implementation details.

Again, thank you very much for your help and your constructive comments.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, January 25, 2017 12:28 PM
To: user@flink.apache.org
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
you are of course right. I forgot that ProcessFunction requires a KeyedStream. 
Sorry for this advice.
The problem is that you need need to implement some kind of time-based function 
that emits partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you this 
except for windows and ProcessFunction.
You could try to implement your own operator by extending 
AbstractStreamOperator and implementing the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps 
and watermarks. Actually, the DataStream operators are built on this interface 
as well.
A custom operator is applied by calling dataStream.transform().
Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos 
<kat...@cs.pitt.edu<mailto:kat...@cs.pitt.edu>>:
Hello Fabian,

First, I would like to thank you for your suggestion and the additional 
information on determinism and partition policies. As I mentioned on my initial 
email, I am new to Flink and every additional piece of advice makes my 
“learning curve” less steep. In addition, I am aware that you (and everyone 
else who follows this thread) might wonder why am I following this 
unconventional path of performance partitioning, but, I have to inform you that 
my use-case’s goal is of academic nature.

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT 
code, and I read the online documentation on the Process Function API which I 
found at: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdev%2Fstream%2Fprocess_function.html&data=01%7C01%7Ckatsip%40cs.pitt.edu%7C590cb15521f144b6ad4e08d44547a7dd%7C9ef9f489e0a04eeb87cc3a526112fd0d%7C1&sdata=G%2FbyjqsSfQq7Rm11yLbPqxoxPwgN7AQm9gRW5vB8vSw%3D&reserved=0>
 . From my understanding, the process() transformation can be applied only on a 
KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use a 
custom partition algorithm, I would have to first make a call to 
partitionCustom() (DataStream<T> -> DataStream<T>), followed by a keyBy(…) 
(DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation 
step (i.e., call to process()). Concretely, my code would turn to something 
like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() 
or .shuffle()
                .keyBy(1)
                .process(new CustomProcessFunction(..., Time.seconds(10),...))
                .sum(2).setParallelism(N);

Unfortunately, you can understand that the above would be problematic for two 
reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, 
because stream will be (ultimately) partitioned based on the keys and not on my 
CustomPartitioner.selectChannels() method. Second, using process() does not 
solve my problem, because the issue with my use-case is to avoid calling 
keyBy(). If I could do that, then I might as well call window()and not use the 
process API in the first place. To be more precise, if I could use a 
KeyedStream<T>, then I could do the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream
                .partitionCustom(new CustomPartitioner(...))
                .keyBy(1)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))
                .sum(2).setParallelism(N);

Therefore, I don’t think using a Process Function would solve my problem. Am I 
understanding your suggestion correctly? If yes, I would be grateful if you 
could explain to me in more detail. On top of that, after reading my initial 
email again, I believe that the intentions for my use-case were not quite 
clear. Please, do not hesitate to ask me for any clarifications.

Again, thank you very much for your interest and your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh

From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>]
Sent: Tuesday, January 24, 2017 5:15 AM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Custom Partitioning and windowing questions/concerns

Hi Nikos,
Flink's windows require a KeyedStream because they use the keys to manage their 
internal state (each in-progress window has some state that needs to be 
persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic result. In 
your use-case, the result of the pre-aggregation (phase 1) should not 
deterministic because it would depend on the partitioning of the input.
I would suggest to implement the pre-aggregation not with a window but with a 
ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results 
every 10 seconds.
Hope this helps,
Fabian


2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos 
<kat...@cs.pitt.edu<mailto:kat...@cs.pitt.edu>>:
Hello all,

Currently, I examine the effects of stream partitioning on performance for 
simple state-full scenarios.

My toy application for the rest of my question will be the following: A stream 
of non-negative integers, each one annotated with a timestamp, and the goal is 
to get the top-10 most frequent non-negative integers on tumbling windows of 10 
seconds. In other words, my input is a stream of tuples with two fields, 
Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer 
value, and timestamp is used to assign each event to a window. The execution 
plan I am considering is to have a first phase (Phase 1), where the stream is 
partitioned and the partial aggregations are processed in parallel (set 
parallelism to N > 1). Afterwards, the second phase (Phase 2) involves 
gathering all partial aggregations on a single node (set parallelism to 1), and 
calculate the full aggregation for each key, order the keys based on windowed 
frequency and outputs the top-10 keys for each window.

As I mentioned earlier, my goal is to compare the performance of different 
partitioning policies on this toy application. Initially, I want to compare 
shuffle-grouping (round-robin) and hash-grouping and then move on to different 
partitioning policies by using Flink’s CustomPartitioner API. After reading 
Flink’s documentation, I managed to develop the toy application using 
hash-partitioning. Below, I present the different parts of my code:

// Phase 0: input setup
DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)
               .assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
                   @Override
                    public long extractAscendingTimestamp(Tuple2<Long, Integer> 
event) { return event.f0; }
                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, 
Integer>(e.f0, e.f1, 1));

On Phase 0, I collect the input stream, from an in-memory list, define the 
event timestamp which will be used for windowing, and extend each event with a 
value of 1 for calculating the appearance of each number on every window. 
Afterwards, for the parallel Phase 1, I use hash partitioning by first using 
.keyBy() operation on the key of each tuple (i.e., field 1), followed by a 
.window() operation, to assign each tuple on a different window, and end with a 
.sum(). My code for (parallel) Phase 1 is the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)
DataStream<Tuple3<Long, Integer, Integer> phaseOne = 
stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

Moving on to Phase 2, to aggregate all partial results of a single window in 
one operator for producing the full aggregation, ordering based on frequency, 
and return the top-10 keys, I have the following:

// Phase 2: serial full aggregation and ordering, with a parallelism of 1
DataStream<String> phaseTwo = phaseOne
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))
                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, 
String, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Tuple3<Long, 
Integer, Integer>> values, Collector<String> out) throws Exception {
                        ...
                        List<Integer> topTenValues = ...;
                        StringBuilder strBuilder = new StringBuilder();
                        for (Integer t : topTenValues)
                            strBuilder.append(Integer.toString(t) + “,”);
                        out.collect(strBuilder.toString());
                    });

The previous code makes use of hash-partitioning for its parallel phase. From 
what I understand, Flink allows the .window() operation only on a KeyedStream. 
Furthermore, the .customPartition() method transforms a DataStream to a 
DataStream (and the same is true for .shuffle() which round-robins events). 
Therefore, I am confused on how I can use a shuffle policy with windows. One 
Idea that came to me is to provide an irrelevant field on the .keyBy() method, 
or define my own KeySelector<IN, KEY> that will simulate shuffle grouping 
through key generation. Unfortunately, I have two concerns regarding the 
previous alternatives: For the keyBy() approach, I need to control the internal 
hashing mechanisms, which entails cherry-picking fields on different workloads 
and performing an exhaustive search on the behavior of different random fields 
(not practical). For the KeySelector<IN, KEY>approach, I need to maintain state 
among different calls of getKey(), which (as far as I know) is not offered by 
the KeySelector<IN, KEY> interface and I do not want to rely on external state 
that will lead to additional overhead. Therefore, my first question is how will 
I be able to effectively use round-robin grouping with windows on my toy 
application?

The bigger point I am trying to address revolves around custom partitioning 
policies and windows in general. My understanding is that the benefit of a 
custom partitioning policy is to have the ability to control the partitioning 
process based on a pre-defined set of resources (e.g., partitions, task slots 
etc.). Hence, I am confused on how I would be able to use partitionCustom() 
followed by .window() on the (parallel) phase one, to test the performance of 
different execution plans (i.e., partitioning policies).

I apologize for the long question, but I believe that I had to provide enough 
details for the points/questions I currently have (highlighted with bold). 
Thank you very much for your time.

Kind Regards,

Nikos R. Katsipoulakis,
Department of Computer Science
University of Pittsburgh



Reply via email to