liu created FLINK-39217:
---------------------------
Summary: CompactCoordinator throws FileNotFoundException during
snapshotState due to race condition with StreamWriter
Key: FLINK-39217
URL: https://issues.apache.org/jira/browse/FLINK-39217
Project: Flink
Issue Type: Bug
Components: Connectors / Hive
Affects Versions: 1.20.3
Environment: flink 1.20.3
Reporter: liu
In Flink 1.20.3, when auto-compaction is enabled for Hive/FileSink, a race
condition exists between StreamingFileWriter and CompactCoordinator.
During the snapshotState phase of StreamingFileWriter, the operator closes the
current in-progress file and potentially triggers the creation of new file
metadata. If the InputFile event for a new file (which should belong to the
next checkpoint) is emitted and processed by the downstream CompactCoordinator
before the coordinator receives the current checkpoint barrier, the coordinator
mistakenly includes this "future" file in its current checkpoint's compaction
coordination.
Since this file is either still in .inprogress state or hasn't been fully
flushed/renamed in the underlying FileSystem (e.g., HDFS), CompactCoordinator
fails to retrieve its FileStatus, leading to java.io.FileNotFoundException and
job failure.
Exception:
java.io.UncheckedIOException: java.io.FileNotFoundException: File does not
exist:
hdfs://xxxxx/xxxx/.uncompacted-part-a79a1409-b663-4ef5-91ee-e746e9bb8e8c-14-1
at
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:177)
at org.apache.flink.connector.file.table.BinPacking.pack(BinPacking.java:40)
at
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:187)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:183)
at
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:161)
at
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:150)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:840)
Test Case:
"I have added this test case to
{{{}org.apache.flink.connector.file.table.stream.compact.CompactCoordinatorTest{}}}."
@Test
void testSnapshotFailsIfInputFileEventArrivesBeforeBarrier() throws Exception {
runCoordinator(
harness -> {
harness.setup();
harness.open();
//mock writer element: create new inProgress file: f0.inprogress
harness.processElement(new InputFile("p0", newInProgressFile("f0", 3)), 0);
//mock StreamWriter task snapshot, rolling file: f1.inprogress
harness.processElement(new InputFile("p0", newInProgressFile("f1", 3)), 0);
harness.snapshot(1, 0);
//mock notifyCheckpointComplete
rename("f0");
assertThrows(
UncheckedIOException.class, () -> {
harness.processElement(new EndCheckpoint(1L, 0, 1), 0);
});
});
}
protected Path newInProgressFile(String name, int len) throws IOException {
Path path = new Path(folder, name);
File file = new File(path.getPath());
file.delete();
//create file.inprogress
Path inProgressFilePath = new Path(folder, name + ".inprogress");
File inProgressFile = new File(inProgressFilePath.getPath());
inProgressFile.delete();
inProgressFile.createNewFile();
try (FileOutputStream out = new FileOutputStream(inProgressFile)) {
for (int i = 0; i < len; i++) {
out.write(i);
}
}
//return file
return path;
}
protected void rename(String name) throws IOException {
Path inProgressPath = new Path(folder, name + ".inprogress");
Path finalPath = new Path(folder, name);
File inProgressFile = new File(inProgressPath.getPath());
File finalFile = new File(finalPath.getPath());
if (!inProgressFile.exists()) {
throw new IOException("Cannot commit: In-progress file does not exist: " +
inProgressPath);
}
boolean success = inProgressFile.renameTo(finalFile);
if (!success) {
throw new IOException("Failed to rename " + inProgressPath + " to " +
finalPath);
}
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)