[ 
https://issues.apache.org/jira/browse/FLINK-37257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinkun Liu updated FLINK-37257:
-------------------------------
    Description: 
When the number of keys is small, the existing hashing method can cause severe 
state skew. For example, if the keys are only 0, 1, and 2, and the Flink job 
has a parallelism of 3, the current Flink code might hash all keys to the 
container with parallelism 2.
h3. Problem Background

In the data stream, a network I/O request is sent every five minutes.

Requirements
 # {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each 
key, as the request volume would be too large.
 # {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a 
single container due to the large data volume.
 # {*}Group by Key{*}: Ensure that data with the same key is included in the 
same I/O request.

I have tried the following code:
{code:java}
stream
  .keyBy(value -> value.hashCode() % env.getParallelism())
  .timeWindow()
  .process(processFunction);{code}
 

I want each parallelism to process one key, but when the number of keys is 
small, due to the presence of MurmurHash, some parallelisms may not be assigned 
any keys, while others may be assigned multiple keys.

 

If possible please assign to me . I'd like to be a contributor.

  was:
h1. Add an API on keyedStream to allow users to customize the hash method from 
key to parallelism.

When the number of keys is small, the existing hashing method can cause severe 
state skew. For example, if the keys are only 0, 1, and 2, and the Flink job 
has a parallelism of 3, the current Flink code might hash all keys to the 
container with parallelism 2.
h3. Problem Background

In the data stream, a network I/O request is sent every five minutes.

Requirements
 # {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each 
key, as the request volume would be too large.
 # {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a 
single container due to the large data volume.
 # {*}Group by Key{*}: Ensure that data with the same key is included in the 
same I/O request.

I have tried the following code:
{code:java}
stream
  .keyBy(value -> value.hashCode() % env.getParallelism())
  .timeWindow()
  .process(processFunction);{code}
 

I want each parallelism to process one key, but when the number of keys is 
small, due to the presence of MurmurHash, some parallelisms may not be assigned 
any keys, while others may be assigned multiple keys.

 

If possible please assign to me . I'd like to be a contributor.


> Add an API on keyedStream to allow users to customize the hash method from 
> key to parallelism.
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37257
>                 URL: https://issues.apache.org/jira/browse/FLINK-37257
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.20.0, 2.0-preview
>            Reporter: Jinkun Liu
>            Priority: Minor
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> When the number of keys is small, the existing hashing method can cause 
> severe state skew. For example, if the keys are only 0, 1, and 2, and the 
> Flink job has a parallelism of 3, the current Flink code might hash all keys 
> to the container with parallelism 2.
> h3. Problem Background
> In the data stream, a network I/O request is sent every five minutes.
> Requirements
>  # {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each 
> key, as the request volume would be too large.
>  # {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a 
> single container due to the large data volume.
>  # {*}Group by Key{*}: Ensure that data with the same key is included in the 
> same I/O request.
> I have tried the following code:
> {code:java}
> stream
>   .keyBy(value -> value.hashCode() % env.getParallelism())
>   .timeWindow()
>   .process(processFunction);{code}
>  
> I want each parallelism to process one key, but when the number of keys is 
> small, due to the presence of MurmurHash, some parallelisms may not be 
> assigned any keys, while others may be assigned multiple keys.
>  
> If possible please assign to me . I'd like to be a contributor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to