Hi Mingey !

Thx for your reply, really, have no idea why everything works in your case, I 
have implemented unit tests in my PR which shows, that problem exists. Please, 
let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit 
test, that shows the problem

@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws 
Exception {
   String partSuffix = ".my";

   File outDir = tempFolder.newFolder();
   long inactivityInterval = 100;

   java.nio.file.Path bucket = Paths.get(outDir.getPath());
   Files.createFile(bucket.resolve("part-0-0.my.pending"));

   String basePath = outDir.getAbsolutePath();
   BucketingSink<String> sink = new BucketingSink<String>(basePath)
      .setBucketer(new BasePathBucketer<>())
      .setInactiveBucketCheckInterval(inactivityInterval)
      .setInactiveBucketThreshold(inactivityInterval)
      .setPartPrefix(PART_PREFIX)
      .setInProgressPrefix("")
      .setPendingPrefix("")
      .setValidLengthPrefix("")
      .setInProgressSuffix(IN_PROGRESS_SUFFIX)
      .setPendingSuffix(PENDING_SUFFIX)
      .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
      .setPartSuffix(partSuffix)
      .setBatchSize(0);

   OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, 1, 0);
   testHarness.setup();
   testHarness.open();

   testHarness.setProcessingTime(0L);

   testHarness.processElement(new StreamRecord<>("test1", 1L));

   testHarness.setProcessingTime(101L);
   testHarness.snapshot(0, 0);
   testHarness.notifyOfCompletedCheckpoint(0);
   sink.close();

   assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}

> On 24 Jun 2018, at 06:02, zhangminglei <18717838...@163.com> wrote:
> 
> Hi, Rinat
> 
> I tried this situation you said and it works fine for me. The partCounter 
> incremented as we hope. When the new part file is created, I did not see any 
> same part index. Here is my code for that, you can take a look.
> In my case, the max index of part file is part-0-683PartSuffix, other than 
> that, all still keep in _part-0-684PartSuffix.pending,  
> _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
> 
> Cheers
> Minglei.
> 
> public class TestSuffix {
>    public static void main(String[] args) throws Exception {
>       ParameterTool params = ParameterTool.fromArgs(args);
>       String outputPath = params.getRequired("outputPath");
> 
>       StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>       sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints 
> <file:///tmp/checkpoints>"));
>       sEnv.enableCheckpointing(200);
>       sEnv.setParallelism(1);
> 
>       BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
>          new BucketingSink<Tuple4<Integer, String, String, 
> Integer>>(outputPath)
>             .setInactiveBucketThreshold(1000)
>             .setInactiveBucketCheckInterval(1000)
>             .setPartSuffix("PartSuffix")
>             .setBatchSize(500);
> 
>       sEnv.addSource(new DataGenerator())
>          .keyBy(0)
>          .map(new CountUpRichMap())
>          .addSink(sink);
> 
>       sEnv.execute();
>    }
> 
>    public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, 
> String, String>, Tuple4<Integer, String, String, Integer>> {
> 
>       private ValueState<Integer> counter;
> 
>       @Override
>       public void open(Configuration parameters) throws Exception {
>          counter = getRuntimeContext().getState(new 
> ValueStateDescriptor<>("counter", Types.INT));
>       }
> 
>       @Override
>       public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, 
> String, String> value) throws Exception {
>          Integer counterValue = counter.value();
>          if (counterValue == null) {
>             counterValue = 0;
>          }
>          counter.update(counterValue + 1);
>          return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
>       }
>    }
> 
>    public static class DataGenerator implements 
> SourceFunction<Tuple3<Integer, String, String>> {
> 
>       public DataGenerator() {
>       }
> 
>       @Override
>       public void run(SourceContext<Tuple3<Integer, String, String>> ctx) 
> throws Exception {
>          for (int i = 0; i < 10000; i++) {
>             ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some 
> payloads......"));
>          }
>       }
> 
>       @Override
>       public void cancel() {
> 
>       }
>    }
> }
> 
> 
> 
> 
>> 在 2018年6月16日,下午10:21,Rinat <r.shari...@cleverdata.ru 
>> <mailto:r.shari...@cleverdata.ru>> 写道:
>> 
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix 
>> of the part file. It’s very useful, when it’s necessary to set specific 
>> extension of the file.
>> 
>> During the usage, I’ve found the issue - when new part file is created, it 
>> has the same part index, as index of just closed file. 
>> So, when Flink tries to move it into final state, we have a 
>> FileAlreadyExistsException.
>> 
>> This problem is related with the following code:
>> Here we are trying to find the max index of part file, that doesn’t exist in 
>> bucket directory, the problem is, that the partSuffix is not involved into 
>> path assembly. This means, that path always doesn’t exist
>> and partCounter wouldn’t be ever incremented.
>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> while (fs.exists(partPath) ||
>>       fs.exists(getPendingPathFor(partPath)) ||
>>       fs.exists(getInProgressPathFor(partPath))) {
>>    bucketState.partCounter++;
>>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> }
>> 
>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>> 
>> Before creating of writer, we appending the partSuffix here, but it should 
>> be already appended, before index checks
>> if (partSuffix != null) {
>>    partPath = partPath.suffix(partSuffix);
>> }
>> I’ll create an issue and try to submit a fix
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply via email to