Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of
the part file. It’s very useful, when it’s necessary to set specific extension
of the file.
During the usage, I’ve found the issue - when new part file is created, it has
the same part index, as index of just closed file.
So, when Flink tries to move it into final state, we have a
FileAlreadyExistsException.
This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in
bucket directory, the problem is, that the partSuffix is not involved into path
assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
}
bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
Before creating of writer, we appending the partSuffix here, but it should be
already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix
Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team
email: [email protected] <mailto:[email protected]>
mobile: +7 (925) 416-37-26
CleverDATA
make your data clever