Yes, it should be exit. Thanks to Ted Yu. Very exactly! 


For #1, the word exist should be exit, right ?
Thanks
Hi, Sagar.
1. It solves the issue partially meaning files which have finished 
checkpointing don't show .pending status but the files which were in 
progress 
when the program exists are still in .pending state.
Ans: 
Yea, Make the program exists and in that time if a checkpoint does not 
finished will lead the status keeps in .pending state then. Under the normal 
circumstances, the programs that running in the production env will never be 
stoped or existed if everything is fine.
2. Ideally, writer should work with default settings correct ? Meaning we 
don't have to explicitly set these parameters to make it work. 
Is this assumption correct ?
Ans: 
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
> However, you know, flink is a real time streaming framework, so under normal 
> circumstances,you don't really go to use the default settings when it comes 
> to a specific business. Especially together work with offline end(Like hadoop 
> mapreduce). In this case, you need to tell the offline end when time a bucket 
> is close and when time the data for the specify bucket is ready. So, you can 
> take a look on 
> <>.
Hi Zhangminglei,
Thanks for the reply.
1. It solves the issue partially meaning files which have finished 
checkpointing don't show .pending status but the files which were in 
progress 
when the program exists are still in .pending state.
2. Ideally, writer should work with default settings correct ? Meaning we 
don't have to explicitly set these parameters to make it work. 
Is this assumption correct ?
>> Hi, Sagar. Please use the below code and you will find the part files status 
>> from   <> to _part-0-107.pending and finally to 
>> part-0-107. [For example], you need to run the program for a while. However, 
>> we need set some parameters, like the following. Moreover, 
>> enableCheckpointing IS also needed. I know why you always see the .pending 
>> file since the below parameters default value is 60 seconds even though you 
>> set the enableCheckpoint. So, that is why you can not see the finished file 
>> status until 60 seconds passed.
>> Attached is the ending on my end, and you will see what you want! 
>> Please let me know if you still have the problem.
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>> public class TestOrc {
>>    public static void main(String[] args) throws Exception {
>>       StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>       env.setParallelism(1);
>>       env.enableCheckpointing(1000);
>>       env.setStateBackend(new MemoryStateBackend());
>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>       String path = "hdfs:// <>";
>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>       bucketingSink
>>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>>          .setInactiveBucketCheckInterval(2000)
>>          .setInactiveBucketThreshold(2000);
>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>       dataStream.addSink(bucketingSink);
>>       env.execute();
>>    }
>>    public static class ManGenerator implements SourceFunction<Row> {
>>       @Override
>>       public void run(SourceContext<Row> ctx) throws Exception {
>>          for (int i = 0; i < 2147483000; i++) {
>>             Row row = new Row(3);
>>             row.setField(0, "Sagar");
>>             row.setField(1, 26 + i);
>>             row.setField(2, false);
>>             ctx.collect(row);
>>          }
>>       }
>>       @Override
>>       public void cancel() {
>>       }
>>    }
>> }
Sure, we can solve it together :)
Are you able to reproduce it ?
>>> Sagar, flush will be called when do a checkpoint. Please see 
>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext context) throws Exception 
>>> {
>>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
>>> been properly initialized.");
>>>    restoredBucketStates.clear();
>>>    synchronized (state.bucketStates) {
>>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
>>> state.bucketStates.entrySet()) {
>>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>>>          if (bucketState.isWriterOpen) {
>>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>>          }
>>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
>>> bucketState.pendingFiles);
>>>          }
>>>          bucketState.pendingFiles = new ArrayList<>();
>>>       }
>>>       restoredBucketStates.add(state);
>>>       if (LOG.isDebugEnabled()) {
>>>          LOG.debug("{} idx {} checkpointed {}.", 
>>> getClass().getSimpleName(), subtaskIdx, state);
>>>       }
>>>    }
Thanks for replying. 
Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 
env.enablecheckpointing(100);
But in all the cases, I still see .pending state. 
Not sure if it's related to flush() method from OrcFileWriter ? Which 
might not be getting called somehow ?
Hi,Sagar
Please take a look at BucketingSink, It says that a file would keep 
.pending status if you DO NOT do a checkpoint. Doc says,  when a 
checkpoint is successful the currently pending file will be removed to 
{@code finished}. 
Take a try again. I think you should call the below method and see what 
would happen on it. Anyway, I will also try that and see whether it works. 
Please let me know if you still meet error.
>>>>  env.enableCheckpointing(200);
>>>> /**
>>>>  * The suffix for {@code pending} part files. These are closed files that 
>>>> we are
>>>>  * not currently writing to (inactive or reached {@link #batchSize}), but 
>>>> which
>>>>  * were not yet confirmed by a checkpoint.
>>>>  */
>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code 
>>>> pending} or {@code finished}.
>>>> * The reason for this is how the sink works together with the 
>>>> checkpointing mechanism to provide exactly-once
>>>> * semantics and fault-tolerance. The part file that is currently being 
>>>> written to is {@code in-progress}. Once
>>>> * a part file is closed for writing it becomes {@code pending}. When a 
>>>> checkpoint is successful the currently
>>>> * pending files will be moved to {@code finished}.
Thanks Zhangminglei for quick response.
I tried the above code and I am seeing another issue where the files 
created on hdfs are always in .pending state.
Let me know if you can reproduce it ?
Hi, Sagar
I did a local test for that and it seems works fine for me. PR will be 
updated for [FLINK-9407] 
I will update the newest code to PR soon and below is the example I was 
using for my test. You can check it again. Hopes you can enjoy it!
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>> import org.apache.flink.types.Row;
>>>>> public class TestOrc {
>>>>>    public static void main(String[] args) throws Exception {
>>>>>       StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>       env.setParallelism(1);
>>>>>       String orcSchemaString = 
>>>>> "struct<name:string,age:int,married:boolean>";
>>>>>       String path = "hdfs:// <>";
>>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>>       dataStream.addSink(bucketingSink);
>>>>>       env.execute();
>>>>>    }
>>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>>>       @Override
>>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>>          for (int i = 0; i < 3; i++) {
>>>>>             Row row = new Row(3);
>>>>>             row.setField(0, "Sagar");
>>>>>             row.setField(1, 26 + i);
>>>>>             row.setField(2, false);
>>>>>             ctx.collect(row);
>>>>>          }
>>>>>       }
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>       }
>>>>>    }
>>>>> }
Hi Zhangminglei,
Question about 
 
I tried to use the code from PR and run it on local hdfs cluster to 
write some ORC data.
But somehow this code is failing with following error:
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>>>>>>  Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/ 
>>>>>> <>-progress for DFSClient_NONMAPREDUCE_73219864_36 on 
>>>>>> because this file lease is currently owned by 
>>>>>> DFSClient_NONMAPREDUCE_-1374584007_36 on
>>>>>>  at 
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(
>>>>>> I understand that this error is related to Hadoop but somehow I get this 
>>>>>> error only when executing the code from this PR.
>>>>>> I had created very crude way to write ORC file to HDFS as per follows. 
>>>>>> Below code works alright and does not throw above error.
>>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>> import org.apache.orc.OrcFile;
>>>>>> import org.apache.orc.TypeDescription;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>> import;
>>>>>> public class FlinkOrcWriterV1<T> implements 
>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>>     String schema;
>>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>>     String basePath;
>>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>>         this.schema = schema;
>>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>>     }
>>>>>>     @Override
>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>         Configuration conf = new Configuration();
>>>>>>         orcWriter = OrcFile.createWriter(new 
>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/ <>"),
>>>>>>                     OrcFile.writerOptions(conf)
>>>>>>                         .setSchema(typeDescriptionschema));
>>>>>>     }
>>>>>>     @Override
>>>>>>     public long flush() throws IOException {
>>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>>     }
>>>>>>     @Override
>>>>>>     public long getPos() throws IOException {
>>>>>>         return orcWriter.getRawDataSize();
>>>>>>     }
>>>>>>     @Override
>>>>>>     public void close() throws IOException {
>>>>>>         orcWriter.close();
>>>>>>     }
>>>>>>     @Override
>>>>>>     public void write(T element) throws IOException {
>>>>>>         VectorizedRowBatch batch = 
>>>>>> typeDescriptionschema.createRowBatch(10);
>>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>>         for(int r=0; r < 10; ++r) {
>>>>>>             int row = batch.size++;
>>>>>>             x.vector[row] = r;
>>>>>>             y.vector[row] = r * 3;
>>>>>>             // If the batch is full, write it out and start over.
>>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>>                 orcWriter.addRowBatch(batch);
>>>>>>                 batch.reset();
>>>>>>             }
>>>>>>         }
>>>>>>         if (batch.size != 0) {
>>>>>>             orcWriter.addRowBatch(batch);
>>>>>>             batch.reset();
>>>>>>         }
>>>>>>     }
>>>>>>     @Override
>>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>>     }
>>>>>> }
>>>>>> Not sure, if the error is related to any of the hadoop dependencies or 
>>>>>> something else ?
>>>>>> Can you please look into it and let me know if you can reproduce it on 
>>>>>> your end too ?
>>>>>> By the way, following are my dependencies in my project: 
>>>>>> dependencies {
>>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
>>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version: 
>>>>>> '1.1.0'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', 
>>>>>> version: '1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version: 
>>>>>> '1.4.2'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11', 
>>>>>> version: '1.4.2'
>>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro', version: 
>>>>>> '1.10.0'
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common', 
>>>>>> version: '1.10.0'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: 
>>>>>> '1.4.2'
>>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>> }
