Hi Komal, KeyBy(Hash Partition, logically partition) and rebalance(physical partition) are both one of the partitions been supported by Flink.[1]
Generally speaking, partitioning may cause network communication(network shuffles) costs which may cause more time cost. The example provided by you may be benefit from operator chain[2] if you remove the keyBy operation. Best, Vino [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道: > Anyone? > > On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> wrote: > >> Hello everyone, >> >> I want to get some insights on the KeyBy (and Rebalance) operations as >> according to my understanding they partition our tasks over the defined >> parallelism and thus should make our pipeline faster. >> >> I am reading a topic which contains 170,000,000 pre-stored records with >> 11 Kafka partitions and replication factor of 1. Hence I use >> .setStartFromEarliest() to read the stream. >> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores >> and 1 job manager with 6 cores. (10 task slots per TM hence I set >> environment parallelism to 30). >> >> There are about 10,000 object IDs hence 10,000 keys. Right now I'm >> keeping the number of records fixed to get a handle on how fast they're >> being processed. >> >> When I remove keyBy, I get the same results in 39 secs as opposed to 52 >> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or >> below I still get the same extra overhead of 9 to 13secs. My data is mostly >> uniformly distributed on it's key so I can rule out skew. Rebalance >> likewise has the same latency as keyBy. >> >> What I want to know is what may be causing this overhead? And is there >> any way to decrease it? >> >> Here's the script I'm running for testing purposes: >> -------------- >> DataStream JSONStream = env.addSource(new FlinkKafkaConsumer<>("data", >> new >> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest()) >> >> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint()); >> >> mypoints.keyBy("oID").filter(new findDistancefromPOI()); >> >> public class findDistancefromPOI extends RichFilterFunction<Point> { >> public boolean filter(Point input) throws Exception { >> Double distance = computeEuclideanDist( >> 16.4199 , 89.974 ,input.X(),input.Y); >> return distance > 0; >> } >> } >> >> Best Regards, >> Komal >> >