Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical
partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network
shuffles) costs which may cause more time cost. The example provided by you
may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations as
>> according to my understanding they partition our tasks over the defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records with
>> 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is there
>> any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --------------
>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>> new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> public class findDistancefromPOI extends RichFilterFunction<Point> {
>>     public boolean filter(Point input) throws Exception {
>>         Double distance = computeEuclideanDist(
>> 16.4199  , 89.974  ,input.X(),input.Y);
>>          return distance > 0;
>>     }
>> }
>>
>> Best Regards,
>> Komal
>>
>

Reply via email to