Thank you @vino yang <yanghua1...@gmail.com> for the reply. I suspect keyBy will beneficial in those cases where my subsequent operators are computationally intensive. Their computation time being > than network reshuffling cost.
Regards, Komal On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote: > Hi Komal, > > KeyBy(Hash Partition, logically partition) and rebalance(physical > partition) are both one of the partitions been supported by Flink.[1] > > Generally speaking, partitioning may cause network communication(network > shuffles) costs which may cause more time cost. The example provided by you > may be benefit from operator chain[2] if you remove the keyBy operation. > > Best, > Vino > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations > [2]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains > > Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道: > >> Anyone? >> >> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> wrote: >> >>> Hello everyone, >>> >>> I want to get some insights on the KeyBy (and Rebalance) operations as >>> according to my understanding they partition our tasks over the defined >>> parallelism and thus should make our pipeline faster. >>> >>> I am reading a topic which contains 170,000,000 pre-stored records with >>> 11 Kafka partitions and replication factor of 1. Hence I use >>> .setStartFromEarliest() to read the stream. >>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores >>> and 1 job manager with 6 cores. (10 task slots per TM hence I set >>> environment parallelism to 30). >>> >>> There are about 10,000 object IDs hence 10,000 keys. Right now I'm >>> keeping the number of records fixed to get a handle on how fast they're >>> being processed. >>> >>> When I remove keyBy, I get the same results in 39 secs as opposed to 52 >>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or >>> below I still get the same extra overhead of 9 to 13secs. My data is mostly >>> uniformly distributed on it's key so I can rule out skew. Rebalance >>> likewise has the same latency as keyBy. >>> >>> What I want to know is what may be causing this overhead? And is there >>> any way to decrease it? >>> >>> Here's the script I'm running for testing purposes: >>> -------------- >>> DataStream JSONStream = env.addSource(new FlinkKafkaConsumer<>("data", >>> new >>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest()) >>> >>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint()); >>> >>> mypoints.keyBy("oID").filter(new findDistancefromPOI()); >>> >>> public class findDistancefromPOI extends RichFilterFunction<Point> { >>> public boolean filter(Point input) throws Exception { >>> Double distance = computeEuclideanDist( >>> 16.4199 , 89.974 ,input.X(),input.Y); >>> return distance > 0; >>> } >>> } >>> >>> Best Regards, >>> Komal >>> >>