Hi Mingey ! Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ? Current fix is actual for current master branch, here it an example of unit test, that shows the problem
@Test public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws Exception { String partSuffix = ".my"; File outDir = tempFolder.newFolder(); long inactivityInterval = 100; java.nio.file.Path bucket = Paths.get(outDir.getPath()); Files.createFile(bucket.resolve("part-0-0.my.pending")); String basePath = outDir.getAbsolutePath(); BucketingSink<String> sink = new BucketingSink<String>(basePath) .setBucketer(new BasePathBucketer<>()) .setInactiveBucketCheckInterval(inactivityInterval) .setInactiveBucketThreshold(inactivityInterval) .setPartPrefix(PART_PREFIX) .setInProgressPrefix("") .setPendingPrefix("") .setValidLengthPrefix("") .setInProgressSuffix(IN_PROGRESS_SUFFIX) .setPendingSuffix(PENDING_SUFFIX) .setValidLengthSuffix(VALID_LENGTH_SUFFIX) .setPartSuffix(partSuffix) .setBatchSize(0); OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0); testHarness.setup(); testHarness.open(); testHarness.setProcessingTime(0L); testHarness.processElement(new StreamRecord<>("test1", 1L)); testHarness.setProcessingTime(101L); testHarness.snapshot(0, 0); testHarness.notifyOfCompletedCheckpoint(0); sink.close(); assertThat(Files.exists(bucket.resolve("part-0-1")), is(true)); } > On 24 Jun 2018, at 06:02, zhangminglei <18717838...@163.com> wrote: > > Hi, Rinat > > I tried this situation you said and it works fine for me. The partCounter > incremented as we hope. When the new part file is created, I did not see any > same part index. Here is my code for that, you can take a look. > In my case, the max index of part file is part-0-683PartSuffix, other than > that, all still keep in _part-0-684PartSuffix.pending, > _part-0-685PartSuffix.pending and so on since checkpoint does not finished. > > Cheers > Minglei. > > public class TestSuffix { > public static void main(String[] args) throws Exception { > ParameterTool params = ParameterTool.fromArgs(args); > String outputPath = params.getRequired("outputPath"); > > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints > <file:///tmp/checkpoints>")); > sEnv.enableCheckpointing(200); > sEnv.setParallelism(1); > > BucketingSink<Tuple4<Integer, String, String, Integer>> sink = > new BucketingSink<Tuple4<Integer, String, String, > Integer>>(outputPath) > .setInactiveBucketThreshold(1000) > .setInactiveBucketCheckInterval(1000) > .setPartSuffix("PartSuffix") > .setBatchSize(500); > > sEnv.addSource(new DataGenerator()) > .keyBy(0) > .map(new CountUpRichMap()) > .addSink(sink); > > sEnv.execute(); > } > > public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, > String, String>, Tuple4<Integer, String, String, Integer>> { > > private ValueState<Integer> counter; > > @Override > public void open(Configuration parameters) throws Exception { > counter = getRuntimeContext().getState(new > ValueStateDescriptor<>("counter", Types.INT)); > } > > @Override > public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, > String, String> value) throws Exception { > Integer counterValue = counter.value(); > if (counterValue == null) { > counterValue = 0; > } > counter.update(counterValue + 1); > return Tuple4.of(value.f0, value.f1, value.f2, counterValue); > } > } > > public static class DataGenerator implements > SourceFunction<Tuple3<Integer, String, String>> { > > public DataGenerator() { > } > > @Override > public void run(SourceContext<Tuple3<Integer, String, String>> ctx) > throws Exception { > for (int i = 0; i < 10000; i++) { > ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some > payloads......")); > } > } > > @Override > public void cancel() { > > } > } > } > > > > >> 在 2018年6月16日,下午10:21,Rinat <r.shari...@cleverdata.ru >> <mailto:r.shari...@cleverdata.ru>> 写道: >> >> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix >> of the part file. It’s very useful, when it’s necessary to set specific >> extension of the file. >> >> During the usage, I’ve found the issue - when new part file is created, it >> has the same part index, as index of just closed file. >> So, when Flink tries to move it into final state, we have a >> FileAlreadyExistsException. >> >> This problem is related with the following code: >> Here we are trying to find the max index of part file, that doesn’t exist in >> bucket directory, the problem is, that the partSuffix is not involved into >> path assembly. This means, that path always doesn’t exist >> and partCounter wouldn’t be ever incremented. >> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + >> bucketState.partCounter); >> while (fs.exists(partPath) || >> fs.exists(getPendingPathFor(partPath)) || >> fs.exists(getInProgressPathFor(partPath))) { >> bucketState.partCounter++; >> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + >> bucketState.partCounter); >> } >> >> bucketState.creationTime = processingTimeService.getCurrentProcessingTime(); >> >> Before creating of writer, we appending the partSuffix here, but it should >> be already appended, before index checks >> if (partSuffix != null) { >> partPath = partPath.suffix(partSuffix); >> } >> I’ll create an issue and try to submit a fix >> >> Sincerely yours, >> Rinat Sharipov >> Software Engineer at 1DMP CORE Team >> >> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> >> mobile: +7 (925) 416-37-26 >> >> CleverDATA >> make your data clever >> > Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> mobile: +7 (925) 416-37-26 CleverDATA make your data clever