[ https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler updated FLINK-9603: ------------------------------------ Affects Version/s: 1.6.0 > Incorrect indexing of part files, when part suffix is specified > (FileAlreadyExistsException) > -------------------------------------------------------------------------------------------- > > Key: FLINK-9603 > URL: https://issues.apache.org/jira/browse/FLINK-9603 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.5.0, 1.6.0 > Reporter: Rinat Sharipov > Assignee: Kostas Kloudas > Priority: Major > > Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of > the part file. It’s very useful, when it’s necessary to set specific > extension of the file. > > During the usage, I’ve found the issue - when new part file is created, it > has the same part index, as index of just closed file. > So, when Flink tries to move it into final state, we have a > FileAlreadyExistsException. > > This problem is related with the following code: > *{color:#e32400}Here we are trying to find the max index of part file, that > doesn’t exist in bucket directory, the problem is, that the partSuffix is not > involved into path assembly. This means, that path always doesn’t > exist{color}* > *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}* > > {code:java} > Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > while (fs.exists(partPath) || > fs.exists(getPendingPathFor(partPath)) || > fs.exists(getInProgressPathFor(partPath))) { > bucketState.partCounter++; > partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > } > bucketState.creationTime = processingTimeService.getCurrentProcessingTime(); > {code} > *{color:#e32400}Before creating of writer, we appending the partSuffix here, > but it should be already appended, before index checks{color}* > {code:java} > if (partSuffix != null) { > partPath = partPath.suffix(partSuffix); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)