[
https://issues.apache.org/jira/browse/FLINK-37833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiangjie Qin resolved FLINK-37833.
----------------------------------
Resolution: Fixed
Patch Merged.
master: 0fcde771ba23eae7f1756d8961c385c2d7a16b88
release-1.20: de4a5b71dafad1e9ca0954746d43436263c46228
release-1.19: aa277c283795953b6e8717f0e0f4f30341c05d46
> Code generated for binary key in BatchExecExchange causes incorrect shuffle
> ---------------------------------------------------------------------------
>
> Key: FLINK-37833
> URL: https://issues.apache.org/jira/browse/FLINK-37833
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 2.0.0, 1.19.2, 1.20.1
> Reporter: Venkata krishnan Sowrirajan
> Assignee: Venkata krishnan Sowrirajan
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.1.0
>
>
> Context:
> *
> [CodegenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319]
> function generates hash function code for
> [BinaryHashPartitioner|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/partitioner/BinaryHashPartitioner.java]
> evaluates
> [BYTE_ARRAY_BASE_OFFSET|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java#L31]
> on the JobManager’s (JM) side and executes the code in TaskManager (TM) to
> compute the hashcode.
> *
> [MurmurHashUtil.hashUnsafeBytes|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/MurmurHashUtil.java#L52]
> uses Java’s _Unsafe_ API to access the byte[] (16 bytes) as ints of 4 bytes,
> operates on it and computes the hashcode. Note: this is what Spark does too
> for efficient hash computation.
> * JVM optimizes the record layout of the objects based on JVM memory size,
> JVM args (-XX:-UseCompressedOops), processor architecture (64-bit, 32-bit)
> etc. JVM optimizes the record layout of a byte[] and its base offset as below:
> ##
> |JVM Config / Scenario|Likely ARRAY_BYTE_BASE_OFFSET|
> |64-bit JVM with compressed OOPs (default)|16|
> |64-bit JVM without compressed OOPs|24|
> |32-bit JVM|Typically 12 or less|
> |Custom JVM / non-HotSpot|Depends|
> * In our environment, when JM heap size was > 32G while TM heap size was <
> 32G. This means the ARRAY_BYTE_BASE_OFFSET of byte[] on JM was 24 while on TM
> was 16.
> * Due to the above HashPartitioner codegen bug, the offset to read from was
> 24 (ARRAY_BYTE_BASE_OFFSET computed on JM and sent to TM) while the base
> offset on TM should have been 16 given the JVM heap size is < 32G. The issue
> here is {_}Offset misalignment relative to base{_}.
> * This made the entire shuffle of the data by the byte[] key go totally wrong
> The fix is to not evaluate BYTE_ARRAY_BASE_OFFSET
> ([CodeGenUtils.hashCodeForType|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala#L319])
> in JM rather only in TM
--
This message was sent by Atlassian Jira
(v8.20.10#820010)