Hi Komal, as a general rule of thumb, you want to avoid network shuffles as much as possible. As vino pointed out, you need to reshuffle, if you need to group by key. Another frequent usecase is for a rebalancing of data in case of a heavy skew. Since neither applies to you, removing the keyby is the best option.
If you want to retain it, because you may experience skew in the future, there are only a couple of things you can do. You may tinker with networking settings to have smaller/larger network buffers (smaller = less latency, larger = more throughput) [1]. Of course, you get better results if you have a faster network (running in the cloud, you can play around with different adapters). Also you could try if less/more machines are actually faster (less machines = less network traffic, more machines = more compute power). In any case, your data volume is so low that I would probably not optimize too much. We are talking about seconds and the times may vary largely from run to run, because of the low data volume. If you want to test the throughput as a POC for a larger volume, I'd either generate a larger sample or replicate it to get more reliable numbers. In any case, try to have your final use case in mind when deciding for an option. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers On Mon, Dec 9, 2019 at 10:25 AM vino yang <yanghua1...@gmail.com> wrote: > Hi Komal, > > Actually, the main factor about choosing the type of the partition depends > on your business logic. If you want to do some aggregation logic based on a > group. You must choose KeyBy to guarantee the correctness semantics. > > Best, > Vino > > Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 下午5:07写道: > >> Thank you @vino yang <yanghua1...@gmail.com> for the reply. I suspect >> keyBy will beneficial in those cases where my subsequent operators are >> computationally intensive. Their computation time being > than network >> reshuffling cost. >> >> Regards, >> Komal >> >> On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Komal, >>> >>> KeyBy(Hash Partition, logically partition) and rebalance(physical >>> partition) are both one of the partitions been supported by Flink.[1] >>> >>> Generally speaking, partitioning may cause network communication(network >>> shuffles) costs which may cause more time cost. The example provided by you >>> may be benefit from operator chain[2] if you remove the keyBy operation. >>> >>> Best, >>> Vino >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations >>> [2]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains >>> >>> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道: >>> >>>> Anyone? >>>> >>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> >>>> wrote: >>>> >>>>> Hello everyone, >>>>> >>>>> I want to get some insights on the KeyBy (and Rebalance) operations as >>>>> according to my understanding they partition our tasks over the defined >>>>> parallelism and thus should make our pipeline faster. >>>>> >>>>> I am reading a topic which contains 170,000,000 pre-stored records >>>>> with 11 Kafka partitions and replication factor of 1. Hence I use >>>>> .setStartFromEarliest() to read the stream. >>>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores >>>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set >>>>> environment parallelism to 30). >>>>> >>>>> There are about 10,000 object IDs hence 10,000 keys. Right now I'm >>>>> keeping the number of records fixed to get a handle on how fast they're >>>>> being processed. >>>>> >>>>> When I remove keyBy, I get the same results in 39 secs as opposed to >>>>> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or >>>>> below I still get the same extra overhead of 9 to 13secs. My data is >>>>> mostly >>>>> uniformly distributed on it's key so I can rule out skew. Rebalance >>>>> likewise has the same latency as keyBy. >>>>> >>>>> What I want to know is what may be causing this overhead? And is >>>>> there any way to decrease it? >>>>> >>>>> Here's the script I'm running for testing purposes: >>>>> -------------- >>>>> DataStream JSONStream = env.addSource(new >>>>> FlinkKafkaConsumer<>("data", new >>>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest()) >>>>> >>>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint()); >>>>> >>>>> mypoints.keyBy("oID").filter(new findDistancefromPOI()); >>>>> >>>>> public class findDistancefromPOI extends RichFilterFunction<Point> { >>>>> public boolean filter(Point input) throws Exception { >>>>> Double distance = computeEuclideanDist( >>>>> 16.4199 , 89.974 ,input.X(),input.Y); >>>>> return distance > 0; >>>>> } >>>>> } >>>>> >>>>> Best Regards, >>>>> Komal >>>>> >>>>