Hi, Rinat
I tried this situation you said and it works fine for me. The partCounter
incremented as we hope. When the new part file is created, I did not see any
same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than
that, all still keep in _part-0-684PartSuffix.pending,
_part-0-685PartSuffix.pending and so on since checkpoint does not finished.
Cheers
Minglei.
public class TestSuffix {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
sEnv.enableCheckpointing(200);
sEnv.setParallelism(1);
BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);
sEnv.addSource(new DataGenerator())
.keyBy(0)
.map(new CountUpRichMap())
.addSink(sink);
sEnv.execute();
}
public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer,
String, String>, Tuple4<Integer, String, String, Integer>> {
private ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new
ValueStateDescriptor<>("counter", Types.INT));
}
@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer,
String, String> value) throws Exception {
Integer counterValue = counter.value();
if (counterValue == null) {
counterValue = 0;
}
counter.update(counterValue + 1);
return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
}
}
public static class DataGenerator implements SourceFunction<Tuple3<Integer,
String, String>> {
public DataGenerator() {
}
@Override
public void run(SourceContext<Tuple3<Integer, String, String>> ctx)
throws Exception {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some
payloads......"));
}
}
@Override
public void cancel() {
}
}
}
> 在 2018年6月16日,下午10:21,Rinat <[email protected]> 写道:
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of
> the part file. It’s very useful, when it’s necessary to set specific
> extension of the file.
>
> During the usage, I’ve found the issue - when new part file is created, it
> has the same part index, as index of just closed file.
> So, when Flink tries to move it into final state, we have a
> FileAlreadyExistsException.
>
> This problem is related with the following code:
> Here we are trying to find the max index of part file, that doesn’t exist in
> bucket directory, the problem is, that the partSuffix is not involved into
> path assembly. This means, that path always doesn’t exist
> and partCounter wouldn’t be ever incremented.
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
> fs.exists(getPendingPathFor(partPath)) ||
> fs.exists(getInProgressPathFor(partPath))) {
> bucketState.partCounter++;
> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> }
>
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>
> Before creating of writer, we appending the partSuffix here, but it should be
> already appended, before index checks
> if (partSuffix != null) {
> partPath = partPath.suffix(partSuffix);
> }
> I’ll create an issue and try to submit a fix
>
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
>
> email: [email protected] <mailto:[email protected]>
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>