[ 
https://issues.apache.org/jira/browse/FLINK-38349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18037894#comment-18037894
 ] 

Rion Williams edited comment on FLINK-38349 at 11/13/25 7:09 PM:
-----------------------------------------------------------------

Just as a follow up, I was able to pretty consistently [reproduce this problem 
within a unit 
test|https://github.com/rionmonster/flink/blob/flink-38349/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SpillingThreadTest.java]
 using several combinations of `maxFanIn` and `channelSize` to verify the 
ArithmeticException gets thrown as expected:

!Screenshot 2025-11-12 at 4.00.42 PM.png|width=830,height=198!

There are probably a few different options in terms of resolving it. A few that 
come to mind would be:
 * Changing how we calculate the scaling factor (e.g. instead of using the 
Math.log() function applying a while loop with continual integer division)
 * Adding a series of integer coercion/guards in place (e.g. detect <= 0 values 
and handle accordingly by returning a single new list, etc.)

I should have some time to tinker around later today/tomorrow on it. Very open 
to feedback or preferred approaches to resolving it.


was (Author: rionmonster):
Just as a follow up, I was able to pretty consistently [reproduce this problem 
in a unit 
test|https://github.com/rionmonster/flink/commit/cf280ab74e2e111d0cb57656a9dd3439fac739aa]
 using several combinations of `maxFanIn` and `channelSize` to verify the 
ArithmeticException gets thrown as expected:

!Screenshot 2025-11-12 at 4.00.42 PM.png|width=830,height=198!

There are probably a few different options in terms of resolving it. A few that 
come to mind would be:
 * Changing how we calculate the scaling factor (e.g. instead of using the 
Math.log() function applying a while loop with continual integer division)
 * Adding a series of integer coercion/guards in place (e.g. detect <= 0 values 
and handle accordingly by returning a single new list, etc.)

I should have some time to tinker around later today/tomorrow on it. Very open 
to feedback or preferred approaches to resolving it.

> Incorrect calculation of scale in SpillingThread#mergeChannelList may cause 
> divide-by-zero exception
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38349
>                 URL: https://issues.apache.org/jira/browse/FLINK-38349
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.13.0, 1.19.3, 1.20.2
>         Environment: 所有环境
>            Reporter: Huny
>            Priority: Blocker
>         Attachments: Screenshot 2025-11-12 at 4.00.42 PM.png, 
> image-2025-09-12-09-05-54-870.png, image-2025-09-12-09-06-10-159.png
>
>
> *Description*
> In the method 
> {{{}org.apache.flink.runtime.operators.sort.SpillingThread#mergeChannelList{}}},
>  the following code may lead to incorrect results:
> {{final double scale = Math.ceil(Math.log(channelIDs.size()) / 
> Math.log(this.maxFanIn)) - 1;}}
> *Steps to Reproduce*
>  # Configure {{{}maxFanIn = 18{}}}.
>  # Run with {{channelIDs.size() = 7055}} (which is greater than or equal to 
> {{{}18^3{}}}).
>  # The calculation results in:
>  * 
>  ** {{numStart = 7055}}
>  * 
>  ** {{numEnd = 5832}} ({{{}18^3{}}})
>  * 
>  ** {{numToMerge = 0}}
>  * 
>  ** {{channelsToMergePerStep = 0}}
>  # Later processing logic performs division by zero and throws an exception.
> *Expected Behavior*
> The calculation of {{scale}} should never result in 
> {{{}channelsToMergePerStep = 0{}}}.
> No divide-by-zero exception should occur.
> *Actual Behavior*
> When {{channelIDs.size()}} is greater than or equal to some power of 
> {{{}maxFanIn{}}}, the calculation becomes inaccurate due to floating-point 
> precision.
> This results in {{{}numToMerge = 0{}}}, which propagates to 
> {{channelsToMergePerStep = 0}} and eventually causes a {*}divide-by-zero 
> exception{*}.
> *Problem Pattern*
>  * If {{{}channelIDs.size() >= maxFanIn^3{}}}, problematic values of 
> {{maxFanIn}} include:
> {{5, 6, 18, 25, 36, 47, 66, 75, 80, 86, 131, 143, 148 ...}}
>  * If {{{}channelIDs.size() >= maxFanIn^5{}}}, problematic values of 
> {{maxFanIn}} include:
> {{7, 19, 20, 45, 49, 50, 58, 65, 67 ...}}
> *Environment*
>  * Flink version: (all)
> *Logs / Stacktrace*
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: / by zero
> at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
> Caused by: java.lang.ArithmeticException: / by zero
> at 
> org.apache.flink.runtime.operators.sort.SpillingThread.getSegmentsForReaders(SpillingThread.java:574)
> at 
> org.apache.flink.runtime.operators.sort.SpillingThread.mergeChannelList(SpillingThread.java:495)
> at 
> org.apache.flink.runtime.operators.sort.SpillingThread.mergeOnDisk(SpillingThread.java:260)
> at 
> org.apache.flink.runtime.operators.sort.SpillingThread.go(SpillingThread.java:187)
> at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to