Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when 
part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)
  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: 
/var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzm0000gn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
        at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
        at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
        at 
org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
        at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

    @Test//(expected = IOException.class)
   public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
      throws Exception {
      testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
   }

   private void testThatPartIndexIsIncremented(String partSuffix, String 
existingPartFile) throws Exception {
      File outDir = tempFolder.newFolder();
      long inactivityInterval = 100;

      java.nio.file.Path bucket = Paths.get(outDir.getPath());
      Files.createFile(bucket.resolve(existingPartFile));

      String basePath = outDir.getAbsolutePath();
      BucketingSink<String> sink = new BucketingSink<String>(basePath)
         .setBucketer(new BasePathBucketer<>())
         .setInactiveBucketCheckInterval(inactivityInterval)
         .setInactiveBucketThreshold(inactivityInterval)
         .setPartPrefix(PART_PREFIX)
         .setInProgressPrefix("")
         .setPendingPrefix("")
         .setValidLengthPrefix("")
         .setInProgressSuffix(IN_PROGRESS_SUFFIX)
         .setPendingSuffix(PENDING_SUFFIX)
         .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
         .setPartSuffix(partSuffix)
         .setBatchSize(0);

      OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, 1, 0);
      testHarness.setup();
      testHarness.open();

      testHarness.setProcessingTime(0L);

      testHarness.processElement(new StreamRecord<>("test1", 1L));

      testHarness.setProcessingTime(101L);
      testHarness.snapshot(0, 0);
      testHarness.notifyOfCompletedCheckpoint(0);
      sink.close();

      String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + 
partSuffix;
//    assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
   }

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that 
fixes this problem (https://github.com/apache/flink/pull/6176 
<https://github.com/apache/flink/pull/6176>)

For some reasons, I still couldn’t compile the whole flink repository, to run 
your example locally from IDE, but from my point of view, problem exists, and 
the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


> On 25 Jun 2018, at 10:44, Rinat <r.shari...@cleverdata.ru> wrote:
> 
> Hi Mingey !
> 
> Thx for your reply, really, have no idea why everything works in your case, I 
> have implemented unit tests in my PR which shows, that problem exists. 
> Please, let me know which Flink version do you use ?
> Current fix is actual for current master branch, here it an example of unit 
> test, that shows the problem
> 
> @Test
> public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws 
> Exception {
>    String partSuffix = ".my";
> 
>    File outDir = tempFolder.newFolder();
>    long inactivityInterval = 100;
> 
>    java.nio.file.Path bucket = Paths.get(outDir.getPath());
>    Files.createFile(bucket.resolve("part-0-0.my.pending"));
> 
>    String basePath = outDir.getAbsolutePath();
>    BucketingSink<String> sink = new BucketingSink<String>(basePath)
>       .setBucketer(new BasePathBucketer<>())
>       .setInactiveBucketCheckInterval(inactivityInterval)
>       .setInactiveBucketThreshold(inactivityInterval)
>       .setPartPrefix(PART_PREFIX)
>       .setInProgressPrefix("")
>       .setPendingPrefix("")
>       .setValidLengthPrefix("")
>       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
>       .setPendingSuffix(PENDING_SUFFIX)
>       .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
>       .setPartSuffix(partSuffix)
>       .setBatchSize(0);
> 
>    OneInputStreamOperatorTestHarness<String, Object> testHarness = 
> createTestSink(sink, 1, 0);
>    testHarness.setup();
>    testHarness.open();
> 
>    testHarness.setProcessingTime(0L);
> 
>    testHarness.processElement(new StreamRecord<>("test1", 1L));
> 
>    testHarness.setProcessingTime(101L);
>    testHarness.snapshot(0, 0);
>    testHarness.notifyOfCompletedCheckpoint(0);
>    sink.close();
> 
>    assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
> }
> 
>> On 24 Jun 2018, at 06:02, zhangminglei <18717838...@163.com 
>> <mailto:18717838...@163.com>> wrote:
>> 
>> Hi, Rinat
>> 
>> I tried this situation you said and it works fine for me. The partCounter 
>> incremented as we hope. When the new part file is created, I did not see any 
>> same part index. Here is my code for that, you can take a look.
>> In my case, the max index of part file is part-0-683PartSuffix, other than 
>> that, all still keep in _part-0-684PartSuffix.pending,  
>> _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
>> 
>> Cheers
>> Minglei.
>> 
>> public class TestSuffix {
>>    public static void main(String[] args) throws Exception {
>>       ParameterTool params = ParameterTool.fromArgs(args);
>>       String outputPath = params.getRequired("outputPath");
>> 
>>       StreamExecutionEnvironment sEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>       sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints 
>> <file:///tmp/checkpoints>"));
>>       sEnv.enableCheckpointing(200);
>>       sEnv.setParallelism(1);
>> 
>>       BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
>>          new BucketingSink<Tuple4<Integer, String, String, 
>> Integer>>(outputPath)
>>             .setInactiveBucketThreshold(1000)
>>             .setInactiveBucketCheckInterval(1000)
>>             .setPartSuffix("PartSuffix")
>>             .setBatchSize(500);
>> 
>>       sEnv.addSource(new DataGenerator())
>>          .keyBy(0)
>>          .map(new CountUpRichMap())
>>          .addSink(sink);
>> 
>>       sEnv.execute();
>>    }
>> 
>>    public static class CountUpRichMap extends 
>> RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, 
>> String, Integer>> {
>> 
>>       private ValueState<Integer> counter;
>> 
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>          counter = getRuntimeContext().getState(new 
>> ValueStateDescriptor<>("counter", Types.INT));
>>       }
>> 
>>       @Override
>>       public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, 
>> String, String> value) throws Exception {
>>          Integer counterValue = counter.value();
>>          if (counterValue == null) {
>>             counterValue = 0;
>>          }
>>          counter.update(counterValue + 1);
>>          return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
>>       }
>>    }
>> 
>>    public static class DataGenerator implements 
>> SourceFunction<Tuple3<Integer, String, String>> {
>> 
>>       public DataGenerator() {
>>       }
>> 
>>       @Override
>>       public void run(SourceContext<Tuple3<Integer, String, String>> ctx) 
>> throws Exception {
>>          for (int i = 0; i < 10000; i++) {
>>             ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), 
>> "Some payloads......"));
>>          }
>>       }
>> 
>>       @Override
>>       public void cancel() {
>> 
>>       }
>>    }
>> }
>> 
>> 
>> 
>> 
>>> 在 2018年6月16日,下午10:21,Rinat <r.shari...@cleverdata.ru 
>>> <mailto:r.shari...@cleverdata.ru>> 写道:
>>> 
>>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix 
>>> of the part file. It’s very useful, when it’s necessary to set specific 
>>> extension of the file.
>>> 
>>> During the usage, I’ve found the issue - when new part file is created, it 
>>> has the same part index, as index of just closed file. 
>>> So, when Flink tries to move it into final state, we have a 
>>> FileAlreadyExistsException.
>>> 
>>> This problem is related with the following code:
>>> Here we are trying to find the max index of part file, that doesn’t exist 
>>> in bucket directory, the problem is, that the partSuffix is not involved 
>>> into path assembly. This means, that path always doesn’t exist
>>> and partCounter wouldn’t be ever incremented.
>>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" 
>>> + bucketState.partCounter);
>>> while (fs.exists(partPath) ||
>>>       fs.exists(getPendingPathFor(partPath)) ||
>>>       fs.exists(getInProgressPathFor(partPath))) {
>>>    bucketState.partCounter++;
>>>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>>> bucketState.partCounter);
>>> }
>>> 
>>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>>> 
>>> Before creating of writer, we appending the partSuffix here, but it should 
>>> be already appended, before index checks
>>> if (partSuffix != null) {
>>>    partPath = partPath.suffix(partSuffix);
>>> }
>>> I’ll create an issue and try to submit a fix
>>> 
>>> Sincerely yours,
>>> Rinat Sharipov
>>> Software Engineer at 1DMP CORE Team
>>> 
>>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>>> mobile: +7 (925) 416-37-26
>>> 
>>> CleverDATA
>>> make your data clever
>>> 
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply via email to