I`ve implemented a combiner [1] in Flink by extending
OneInputStreamOperator in Flink. I call my operator using "transform".
It works well and I guess it is useful if I import this operator in the
DataStream.java. I just need more to check if I need to touch other parts
of the source code.

But now I want to tackle data skew by altering the way Flink partition keys
using KeyedStream.


On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mmyy1...@gmail.com> wrote:

> Hi Felipe,
> If I understand correctly, you want to solve data skew caused by
> imbalanced key?
> There is a common strategy to solve this kind of problem, pre-aggregation.
> Like combiner of MapReduce.
> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
> afraid you have to implement it by yourself.
> For example, introducing a function caching some data (time or count
> based). This function should be before "keyby". And it's on a non-keyed
> stream. It does pre-aggregation just like what the aggregation after
> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
> I also found a suggestion [1] from Fabian, although it's long time ago.
> Hope it helps.
> 1.
> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
> Thanks,
> Biao /'bɪ.aʊ/
> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>> thanks Biao,
>> I see. To achieve what I want to do I need to work with KeyedStream. I
>> downloaded the Flink source code to learn and alter the KeyedStream to my
>> needs. I am not sure but it is a lot of work because as far as I understood
>> the key-groups have to be predictable [1]. and altering this touches a lot
>> of other parts of the source code.
>> However, If I guarantee that they (key-groups) are predictable, I will be
>> able to rebalance, rescale, .... the keys to other worker-nodes.
>> [1]
>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>> Thanks,
>> Felipe
>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>> Hi Felipe,
>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>> (partitioner) several times.
>>> Flink does not support multiple partitioners on one edge. The later one
>>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>>> "partitionByPartial".
>>> You could insert some nodes between these partitioners to satisfy your
>>> requirement. For example,
>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>> I am executing a data stream application which uses rebalance.
>>>> Basically I am counting words using "src -> split ->
>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3
>>>> examples, one without physical partition strategy, one with rebalance
>>>> strategy [1], and one with partial partition strategy from [2].
>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>> because it splits the stream by key and if I have a stream with skewed key,
>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>> However, I was expecting that *before the keyBy* I would have a
>>>> balanced stream, which is not happening.
>>>> Basically, I want to see the difference in records/sec between
>>>> operators when I use rebalance or any other physical partition strategy.
>>>> However, when I found no difference in the records/sec metrics of all
>>>> operators when I am running 3 different physical partition strategies.
>>>> Screenshots of Prometheus+Grafana are attached.
>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>> rebalance in the right way, or I am not doing a good use case to test the
>>>> rebalance transformation.
>>>> I am also testing a different physical partition to later try to
>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>> partition strategies have to be implemented on a KeyedStream.
>>>> DataStream<String> text = env.addSource(new WordSource());
>>>> // split lines in strings
>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>> Tokenizer());
>>>> // choose a partitioning strategy
>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>> tokenizer.rebalance();
>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>> tokenizer.partitionByPartial(0);
>>>> // count
>>>> partitionedStream.keyBy(0).sum(1).print();
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>> thanks,
>>>> Felipe
