[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15156041#comment-15156041 ]
Greg Hogan commented on FLINK-3385: ----------------------------------- Pull request is [here|https://github.com/apache/flink/pull/1680]. Had to rename the pull request, looks like if there are multiple commits the default name is the branch name rather than the commit log. > Fix outer join skipping unprobed partitions > ------------------------------------------- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime > Reporter: Greg Hogan > Assignee: Greg Hogan > Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)