Hi,

If you noticed that some key groups are hot and in high load, you could try to 
increase the total key groups number (by increase the max parallelism), but pay 
attention that it would cause previous checkpoint cannot be restored . With the 
help of this, we might let the hot key groups share some pressure to others.

If you noticed just some specific keys are really hot, you could try blink 
branch's local agg feature[1] in SQL by setting 
`sql.optimizer.agg.phase.enforcer` as `TWO_PHASE`. This feature will try to 
first aggregate keys locally and then send to next global aggregate node just 
like Hadoop's combine and reduce in some way.  Jark (in CC) might provide more 
information.

Best
Yun Tang

[1] 
https://github.com/apache/flink/blob/2be5f47fb62126fa3a35e44459e660c39e9e0a39/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/TableConfigOptions.java#L39


________________________________
From: Fabian Hueske <fhue...@gmail.com>
Sent: Thursday, February 28, 2019 18:28
To: Aggarwal, Ajay
Cc: user@flink.apache.org
Subject: Re: KeyBy distribution across taskslots

Hi,

The answer is in fact no.
Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to 
tasks, i.e., a task can process more than one key group.
AFAIK, there are no plans to change this behavior.
Stefan (in CC) might be able to give more details on this.

Something that might be possible in the future is to be more clever about the 
key group - task assignment, e.g., taking state size or number of records into 
account.

Best,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state

Am Mi., 27. Feb. 2019 um 17:23 Uhr schrieb Aggarwal, Ajay 
<ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>:

I couldn’t find reference to it anywhere in the docs, so I thought I will ask 
here.



When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. 
customers) are way too noisy than others, is there a way to ensure that too 
many noisy customers do not land on the same taskslot? In general does flink 
attempts to keep the load balanced across different taskslots assigned to a 
KeyBy operator ?



I wouldn’t be surprised if the answer is “currently no”. Would like to know if 
something related is planned for future. Also would love to hear from others 
who ran into similar situation and how they addressed it.



Thanks.


Reply via email to