[ 
https://issues.apache.org/jira/browse/FLINK-20855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JieFang.He updated FLINK-20855:
-------------------------------
    Description: 
When i run the TPCDS of 500G,i get a exception
{code:java}
Caused by: java.lang.IllegalArgumentException
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.setNewBuckets(LongHashPartition.java:223)
        at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:176)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.buildTableFromSpilledPartition(LongHybridHashTable.java:432)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.prepareNextPartition(LongHybridHashTable.java:354)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.nextMatching(LongHybridHashTable.java:145)
        at LongHashJoinOperator$40166.endInput2$(Unknown Source)
        at LongHashJoinOperator$40166.endInput(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:278)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.checkFinished(StreamTwoInputProcessor.java:211)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:183)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:349)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:564)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
{code}
The reason is: when calculate the numBuckets in LongHashPartition,the result 
exceeds the maximum value of int and got a negative number
{code:java}
LongHashPartition(
      LongHybridHashTable longTable,
      int partitionNum,
      BinaryRowDataSerializer buildSideSerializer,
      int bucketNumSegs,
      int recursionLevel,
      List<MemorySegment> buffers,
      int lastSegmentLimit) {
   this(longTable, buildSideSerializer, listToArray(buffers));
   this.partitionNum = partitionNum;
   this.recursionLevel = recursionLevel;

   int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
   MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
   for (int i = 0; i < bucketNumSegs; i++) {
      buckets[i] = longTable.nextSegment();
   }
   setNewBuckets(buckets, numBuckets);
   this.finalBufferLimit = lastSegmentLimit;
}
{code}
A way to avoid the exception is to adjust the calculation order

change
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
{code}
to
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(segmentSize / 16 * 
bucketNumSegs);
{code}

  was:
When i run the TPCDS of 500G,i get a exception
{code:java}
Caused by: java.lang.IllegalArgumentException
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.setNewBuckets(LongHashPartition.java:223)
        at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:176)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.buildTableFromSpilledPartition(LongHybridHashTable.java:432)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.prepareNextPartition(LongHybridHashTable.java:354)
        at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.nextMatching(LongHybridHashTable.java:145)
        at LongHashJoinOperator$40166.endInput2$(Unknown Source)
        at LongHashJoinOperator$40166.endInput(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:278)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.checkFinished(StreamTwoInputProcessor.java:211)
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:183)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:349)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:564)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
{code}
The reason is: when calculate the numBuckets in LongHashPartition,the result 
exceeds the maximum value of int and got a negative number
{code:java}
LongHashPartition(
      LongHybridHashTable longTable,
      int partitionNum,
      BinaryRowDataSerializer buildSideSerializer,
      int bucketNumSegs,
      int recursionLevel,
      List<MemorySegment> buffers,
      int lastSegmentLimit) {
   this(longTable, buildSideSerializer, listToArray(buffers));
   this.partitionNum = partitionNum;
   this.recursionLevel = recursionLevel;

   int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
   MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
   for (int i = 0; i < bucketNumSegs; i++) {
      buckets[i] = longTable.nextSegment();
   }
   setNewBuckets(buckets, numBuckets);
   this.finalBufferLimit = lastSegmentLimit;
}
{code}
A way to avoid the exception is to adjust the calculation order

change
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
{code}
to
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs / 16 * 
segmentSize);
{code}


> Calculating numBuckets exceeds the maximum value of int and got a negative 
> number
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-20855
>                 URL: https://issues.apache.org/jira/browse/FLINK-20855
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: JieFang.He
>            Priority: Major
>
> When i run the TPCDS of 500G,i get a exception
> {code:java}
> Caused by: java.lang.IllegalArgumentException
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at 
> org.apache.flink.table.runtime.hashtable.LongHashPartition.setNewBuckets(LongHashPartition.java:223)
>         at 
> org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:176)
>         at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.buildTableFromSpilledPartition(LongHybridHashTable.java:432)
>         at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.prepareNextPartition(LongHybridHashTable.java:354)
>         at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.nextMatching(LongHybridHashTable.java:145)
>         at LongHashJoinOperator$40166.endInput2$(Unknown Source)
>         at LongHashJoinOperator$40166.endInput(Unknown Source)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:278)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.checkFinished(StreamTwoInputProcessor.java:211)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:183)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:349)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:564)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The reason is: when calculate the numBuckets in LongHashPartition,the result 
> exceeds the maximum value of int and got a negative number
> {code:java}
> LongHashPartition(
>       LongHybridHashTable longTable,
>       int partitionNum,
>       BinaryRowDataSerializer buildSideSerializer,
>       int bucketNumSegs,
>       int recursionLevel,
>       List<MemorySegment> buffers,
>       int lastSegmentLimit) {
>    this(longTable, buildSideSerializer, listToArray(buffers));
>    this.partitionNum = partitionNum;
>    this.recursionLevel = recursionLevel;
>    int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize 
> / 16);
>    MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
>    for (int i = 0; i < bucketNumSegs; i++) {
>       buckets[i] = longTable.nextSegment();
>    }
>    setNewBuckets(buckets, numBuckets);
>    this.finalBufferLimit = lastSegmentLimit;
> }
> {code}
> A way to avoid the exception is to adjust the calculation order
> change
> {code:java}
> int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
> 16);
> {code}
> to
> {code:java}
> int numBuckets = MathUtils.roundDownToPowerOf2(segmentSize / 16 * 
> bucketNumSegs);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to