[ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15154905#comment-15154905 ]
Greg Hogan commented on FLINK-3385: ----------------------------------- Will submit the pull request when Travis has finished. There was an additional fix, discovered with the unit tests running under severe memory constraints, where {{processUnmatchedBuildIter}} would cause a {{NullPointerException}} when all partitions had spilled to disk. That simple fix is included as a hotfix. > Fix outer join skipping unprobed partitions > ------------------------------------------- > > Key: FLINK-3385 > URL: https://issues.apache.org/jira/browse/FLINK-3385 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime > Reporter: Greg Hogan > Assignee: Greg Hogan > Priority: Blocker > Fix For: 1.0.0 > > > {{MutableHashTable.nextRecord}} performs three steps for a build-side outer > join: > {code} > public boolean nextRecord() throws IOException { > if (buildSideOuterJoin) { > return processProbeIter() || > processUnmatchedBuildIter() || prepareNextPartition(); > } else { > return processProbeIter() || prepareNextPartition(); > } > } > {code} > {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to > {{MutableHashTable.moveToNextBucket}} which is unable to process spilled > partitions: > {code} > if (p.isInMemory()) { > ... > } else { > return false; > } > {code} > {{MutableHashTable.prepareNextPartition}} calls > {{HashPartition.finalizeProbePhase}} which only spills the partition (to be > read and processed in the next instantiation of {{MutableHashTable}}) if > probe-side records were spilled. In an equi-join this is fine but with an > outer join the unmatched build-side records must still be retained (though no > further probing is necessary, so could this be short-circuited when loaded by > the next {{MutableHashTable}}?). > {code} > if (isInMemory()) { > ... > } > else if (this.probeSideRecordCounter == 0) { > // partition is empty, no spilled buffers > // return the memory buffer > > freeMemory.add(this.probeSideBuffer.getCurrentSegment()); > // delete the spill files > this.probeSideChannel.close(); > this.buildSideChannel.deleteChannel(); > this.probeSideChannel.deleteChannel(); > return 0; > } > else { > // flush the last probe side buffer and register this > partition as pending > this.probeSideBuffer.close(); > this.probeSideChannel.close(); > spilledPartitions.add(this); > return 1; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)