Jinkun Liu created FLINK-37257:
----------------------------------

             Summary: Add an API on keyedStream to allow users to customize the 
hash method from key to parallelism.
                 Key: FLINK-37257
                 URL: https://issues.apache.org/jira/browse/FLINK-37257
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream
    Affects Versions: 2.0-preview, 1.20.0
            Reporter: Jinkun Liu


h1. Add an API on keyedStream to allow users to customize the hash method from 
key to parallelism.

When the number of keys is small, the existing hashing method can cause severe 
state skew. For example, if the keys are only 0, 1, and 2, and the Flink job 
has a parallelism of 3, the current Flink code might hash all keys to the 
container with parallelism 2.
h3. Problem Background

In the data stream, a network I/O request is sent every five minutes.

Requirements
 # {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each 
key, as the request volume would be too large.
 # {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a 
single container due to the large data volume.
 # {*}Group by Key{*}: Ensure that data with the same key is included in the 
same I/O request.

I have tried the following code:
{code:java}
stream
  .keyBy(value -> value.hashCode() % env.getParallelism())
  .timeWindow()
  .process(processFunction);{code}
 

I want each parallelism to process one key, but when the number of keys is 
small, due to the presence of MurmurHash, some parallelisms may not be assigned 
any keys, while others may be assigned multiple keys.

 

If possible please assign to me . I'd like to be a contributor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to