Hi Mingey !
Thx for your reply, really, have no idea why everything works in your case, I
have implemented unit tests in my PR which shows, that problem exists. Please,
let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit
test, that shows the problem
@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws
Exception {
String partSuffix = ".my";
File outDir = tempFolder.newFolder();
long inactivityInterval = 100;
java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve("part-0-0.my.pending"));
String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();
testHarness.setProcessingTime(0L);
testHarness.processElement(new StreamRecord<>("test1", 1L));
testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();
assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}
> On 24 Jun 2018, at 06:02, zhangminglei <[email protected]> wrote:
>
> Hi, Rinat
>
> I tried this situation you said and it works fine for me. The partCounter
> incremented as we hope. When the new part file is created, I did not see any
> same part index. Here is my code for that, you can take a look.
> In my case, the max index of part file is part-0-683PartSuffix, other than
> that, all still keep in _part-0-684PartSuffix.pending,
> _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
>
> Cheers
> Minglei.
>
> public class TestSuffix {
> public static void main(String[] args) throws Exception {
> ParameterTool params = ParameterTool.fromArgs(args);
> String outputPath = params.getRequired("outputPath");
>
> StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints
> <file:///tmp/checkpoints>"));
> sEnv.enableCheckpointing(200);
> sEnv.setParallelism(1);
>
> BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
> new BucketingSink<Tuple4<Integer, String, String,
> Integer>>(outputPath)
> .setInactiveBucketThreshold(1000)
> .setInactiveBucketCheckInterval(1000)
> .setPartSuffix("PartSuffix")
> .setBatchSize(500);
>
> sEnv.addSource(new DataGenerator())
> .keyBy(0)
> .map(new CountUpRichMap())
> .addSink(sink);
>
> sEnv.execute();
> }
>
> public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer,
> String, String>, Tuple4<Integer, String, String, Integer>> {
>
> private ValueState<Integer> counter;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> counter = getRuntimeContext().getState(new
> ValueStateDescriptor<>("counter", Types.INT));
> }
>
> @Override
> public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer,
> String, String> value) throws Exception {
> Integer counterValue = counter.value();
> if (counterValue == null) {
> counterValue = 0;
> }
> counter.update(counterValue + 1);
> return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
> }
> }
>
> public static class DataGenerator implements
> SourceFunction<Tuple3<Integer, String, String>> {
>
> public DataGenerator() {
> }
>
> @Override
> public void run(SourceContext<Tuple3<Integer, String, String>> ctx)
> throws Exception {
> for (int i = 0; i < 10000; i++) {
> ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some
> payloads......"));
> }
> }
>
> @Override
> public void cancel() {
>
> }
> }
> }
>
>
>
>
>> 在 2018年6月16日,下午10:21,Rinat <[email protected]
>> <mailto:[email protected]>> 写道:
>>
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix
>> of the part file. It’s very useful, when it’s necessary to set specific
>> extension of the file.
>>
>> During the usage, I’ve found the issue - when new part file is created, it
>> has the same part index, as index of just closed file.
>> So, when Flink tries to move it into final state, we have a
>> FileAlreadyExistsException.
>>
>> This problem is related with the following code:
>> Here we are trying to find the max index of part file, that doesn’t exist in
>> bucket directory, the problem is, that the partSuffix is not involved into
>> path assembly. This means, that path always doesn’t exist
>> and partCounter wouldn’t be ever incremented.
>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
>> bucketState.partCounter);
>> while (fs.exists(partPath) ||
>> fs.exists(getPendingPathFor(partPath)) ||
>> fs.exists(getInProgressPathFor(partPath))) {
>> bucketState.partCounter++;
>> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
>> bucketState.partCounter);
>> }
>>
>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>>
>> Before creating of writer, we appending the partSuffix here, but it should
>> be already appended, before index checks
>> if (partSuffix != null) {
>> partPath = partPath.suffix(partSuffix);
>> }
>> I’ll create an issue and try to submit a fix
>>
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>>
>> email: [email protected] <mailto:[email protected]>
>> mobile: +7 (925) 416-37-26
>>
>> CleverDATA
>> make your data clever
>>
>
Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team
email: [email protected] <mailto:[email protected]>
mobile: +7 (925) 416-37-26
CleverDATA
make your data clever