Hi Felipe, Flink job graph is DAG based. It seems that you set an "edge property" (partitioner) several times. Flink does not support multiple partitioners on one edge. The later one overrides the priors. That means the "keyBy" overrides the "rebalance" and "partitionByPartial".
You could insert some nodes between these partitioners to satisfy your requirement. For example, `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();` Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > I am executing a data stream application which uses rebalance. Basically I > am counting words using "src -> split -> physicalPartitionStrategy -> keyBy > -> sum -> print". I am running 3 examples, one without physical > partition strategy, one with rebalance strategy [1], and one with > partial partition strategy from [2]. > I know that the keyBy operator actually kills what rebalance is doing > because it splits the stream by key and if I have a stream with skewed key, > one parallel instance of the operator after the keyBy will be overloaded. > However, I was expecting that *before the keyBy* I would have a balanced > stream, which is not happening. > > Basically, I want to see the difference in records/sec between operators > when I use rebalance or any other physical partition strategy. However, > when I found no difference in the records/sec metrics of all operators when > I am running 3 different physical partition strategies. Screenshots of > Prometheus+Grafana are attached. > > Maybe I am measuring the wrong operator, or maybe I am not using the > rebalance in the right way, or I am not doing a good use case to test the > rebalance transformation. > I am also testing a different physical partition to later try to implement > the issue "FLINK-1725 New Partitioner for better load balancing for skewed > data" [2]. I am not sure, but I guess that all physical partition > strategies have to be implemented on a KeyedStream. > > DataStream<String> text = env.addSource(new WordSource()); > // split lines in strings > DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new > Tokenizer()); > // choose a partitioning strategy > DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer); > DataStream<Tuple2<String, Integer>> partitionedStream = > tokenizer.rebalance(); > DataStream<Tuple2<String, Integer>> partitionedStream = > tokenizer.partitionByPartial(0); > // count > partitionedStream.keyBy(0).sum(1).print(); > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning > [2] https://issues.apache.org/jira/browse/FLINK-1725 > > thanks, > Felipe > > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* >