[ 
https://issues.apache.org/jira/browse/FLINK-37833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-37833:
---------------------------------
    Component/s: Runtime / Network

> Code generated for binary key in BatchExecExchange causes incorrect shuffle
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-37833
>                 URL: https://issues.apache.org/jira/browse/FLINK-37833
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.1.0
>
>
> Context:
> *  
> [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319]
>  function generates hash function code for 
> [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java]
>  evaluates 
> [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31]
>  on the JobManager’s (JM) side and executes the code in TaskManager (TM) to 
> compute the hashcode.
>  * 
> [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52]
>  uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes, 
> operates on it and computes the hashcode. Note: this is what Spark does too 
> for efficient hash computation.
>  * JVM optimizes the record layout of the objects based on JVM memory size, 
> JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit) 
> etc. JVM optimizes the record layout of a byte[] and its base offset as below:
>  ## 
> |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET|
> |64-bit JVM with compressed OOPs (default)|16|
> |64-bit JVM without compressed OOPs|24|
> |32-bit JVM|Typically 12 or less|
> |Custom JVM / non-HotSpot|Depends|
>  * In our environment, when JM heap size was > 32G while TM heap size was < 
> 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM 
> was 16.
>  * Due to the above HashPartitioner codegen bug, the offset to read from was 
> 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base 
> offset on TM should have been 16 given the JVM heap size is < 32G. The issue 
> here is {_}Offset misalignment relative to base{_}.
>  * This made the entire shuffle of the data by the byte[] key go totally wrong
> The fix is to not evaluate BYTE_ARRAY_BASE_OFFSET 
> ([CodeGenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319])
>  in JM rather only in TM



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to