[ 
https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15156041#comment-15156041
 ] 

Greg Hogan commented on FLINK-3385:
-----------------------------------

Pull request is [here|https://github.com/apache/flink/pull/1680]. Had to rename 
the pull request, looks like if there are multiple commits the default name is 
the branch name rather than the commit log.

> Fix outer join skipping unprobed partitions
> -------------------------------------------
>
>                 Key: FLINK-3385
>                 URL: https://issues.apache.org/jira/browse/FLINK-3385
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer 
> join:
> {code}
>       public boolean nextRecord() throws IOException {
>               if (buildSideOuterJoin) {
>                       return processProbeIter() || 
> processUnmatchedBuildIter() || prepareNextPartition();
>               } else {
>                       return processProbeIter() || prepareNextPartition();
>               }
>       }
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to 
> {{MutableHashTable.moveToNextBucket}} which is unable to process spilled 
> partitions:
> {code}
>                       if (p.isInMemory()) {
>                               ...
>                       } else {
>                               return false;
>                       }
> {code}
> {{MutableHashTable.prepareNextPartition}} calls 
> {{HashPartition.finalizeProbePhase}} which only spills the partition (to be 
> read and processed in the next instantiation of {{MutableHashTable}}) if 
> probe-side records were spilled. In an equi-join this is fine but with an 
> outer join the unmatched build-side records must still be retained (though no 
> further probing is necessary, so could this be short-circuited when loaded by 
> the next {{MutableHashTable}}?).
> {code}
>               if (isInMemory()) {
>                       ...
>               }
>               else if (this.probeSideRecordCounter == 0) {
>                       // partition is empty, no spilled buffers
>                       // return the memory buffer
>                       
> freeMemory.add(this.probeSideBuffer.getCurrentSegment());
>                       // delete the spill files
>                       this.probeSideChannel.close();
>                       this.buildSideChannel.deleteChannel();
>                       this.probeSideChannel.deleteChannel();
>                       return 0;
>               }
>               else {
>                       // flush the last probe side buffer and register this 
> partition as pending
>                       this.probeSideBuffer.close();
>                       this.probeSideChannel.close();
>                       spilledPartitions.add(this);
>                       return 1;
>               }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to