Greg Hogan created FLINK-3385:
---------------------------------
Summary: Fix outer join skipping unprobed partitions
Key: FLINK-3385
URL: https://issues.apache.org/jira/browse/FLINK-3385
Project: Flink
Issue Type: Bug
Components: Distributed Runtime
Reporter: Greg Hogan
Priority: Critical
Fix For: 1.0.0
{{MutableHashTable.nextRecord}} performs three steps for a build-side outer
join:
{code}
public boolean nextRecord() throws IOException {
if (buildSideOuterJoin) {
return processProbeIter() ||
processUnmatchedBuildIter() || prepareNextPartition();
} else {
return processProbeIter() || prepareNextPartition();
}
}
{code}
{{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to
{{MutableHashTable.moveToNextBucket}} which is unable to process spilled
partitions:
{code}
if (p.isInMemory()) {
...
} else {
return false;
}
{code}
{{MutableHashTable.prepareNextPartition}} calls
{{HashPartition.finalizeProbePhase}} which only spills the partition (to be
read and processed in the next instantiation of {{MutableHashTable}}) if
probe-side records were spilled. In an equi-join this is fine but with an outer
join the unmatched build-side records must still be retained (though no further
probing is necessary, so could this be short-circuited when loaded by the next
{{MutableHashTable}}?).
{code}
if (isInMemory()) {
...
}
else if (this.probeSideRecordCounter == 0) {
// partition is empty, no spilled buffers
// return the memory buffer
freeMemory.add(this.probeSideBuffer.getCurrentSegment());
// delete the spill files
this.probeSideChannel.close();
this.buildSideChannel.deleteChannel();
this.probeSideChannel.deleteChannel();
return 0;
}
else {
// flush the last probe side buffer and register this
partition as pending
this.probeSideBuffer.close();
this.probeSideChannel.close();
spilledPartitions.add(this);
return 1;
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)