Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

> 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道:
> 
> For #1, the word exist should be exit, right ?
> Thanks
> 
> -------- Original message --------
> From: zhangminglei <18717838...@163.com>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke <sagar...@gmail.com>
> Cc: dev <dev@flink.apache.org>, user <u...@flink.apache.org>
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
> 
> Hi, Sagar.
> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>>     when the program exists are still in .pending state.
> 
> Ans: 
> 
> Yea, Make the program exists and in that time if a checkpoint does not 
> finished will lead the status keeps in .pending state then. Under the normal 
> circumstances, the programs that running in the production env will never be 
> stoped or existed if everything is fine.
> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>>     Is this assumption correct ?
> 
> Ans: 
> 
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
> 
> However, you know, flink is a real time streaming framework, so under normal 
> circumstances,you don't really go to use the default settings when it comes 
> to a specific business. Especially together work with offline end(Like hadoop 
> mapreduce). In this case, you need to tell the offline end when time a bucket 
> is close and when time the data for the specify bucket is ready. So, you can 
> take a look on https://issues.apache.org/jira/browse/FLINK-9609 
> <https://issues.apache.org/jira/browse/FLINK-9609>.
> 
> Cheers
> Zhangminglei
> 
> 
>> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com 
>> <mailto:sagar...@gmail.com>> 写道:
>> 
>> Hi Zhangminglei,
>> 
>> Thanks for the reply.
>> 
>> 1. It solves the issue partially meaning files which have finished 
>> checkpointing don't show .pending status but the files which were in 
>> progress 
>>     when the program exists are still in .pending state.
>> 
>> 2. Ideally, writer should work with default settings correct ? Meaning we 
>> don't have to explicitly set these parameters to make it work. 
>>     Is this assumption correct ?
>> 
>> Thanks,
>> Sagar
>> 
>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com 
>> <mailto:18717838...@163.com>> wrote:
>> Hi, Sagar. Please use the below code and you will find the part files status 
>> from _part-0-107.in-progress   <> to _part-0-107.pending and finally to 
>> part-0-107. [For example], you need to run the program for a while. However, 
>> we need set some parameters, like the following. Moreover, 
>> enableCheckpointing IS also needed. I know why you always see the .pending 
>> file since the below parameters default value is 60 seconds even though you 
>> set the enableCheckpoint. So, that is why you can not see the finished file 
>> status until 60 seconds passed.
>> 
>> Attached is the ending on my end, and you will see what you want! 
>> 
>> Please let me know if you still have the problem.
>> 
>> Cheers
>> Zhangminglei
>> 
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>> 
>> public class TestOrc {
>>    public static void main(String[] args) throws Exception {
>>       StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>       env.setParallelism(1);
>>       env.enableCheckpointing(1000);
>>       env.setStateBackend(new MemoryStateBackend());
>> 
>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>       String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
>> 
>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>> 
>>       bucketingSink
>>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>>          .setInactiveBucketCheckInterval(2000)
>>          .setInactiveBucketThreshold(2000);
>> 
>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>> 
>>       dataStream.addSink(bucketingSink);
>> 
>>       env.execute();
>>    }
>> 
>>    public static class ManGenerator implements SourceFunction<Row> {
>> 
>>       @Override
>>       public void run(SourceContext<Row> ctx) throws Exception {
>>          for (int i = 0; i < 2147483000; i++) {
>>             Row row = new Row(3);
>>             row.setField(0, "Sagar");
>>             row.setField(1, 26 + i);
>>             row.setField(2, false);
>>             ctx.collect(row);
>>          }
>>       }
>> 
>>       @Override
>>       public void cancel() {
>> 
>>       }
>>    }
>> }
>> <filestatus.jpg>
>> 
>> 
>> 
>>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com 
>>> <mailto:sagar...@gmail.com>> 写道:
>>> 
>>> Sure, we can solve it together :)
>>> 
>>> Are you able to reproduce it ?
>>> 
>>> Thanks,
>>> Sagar
>>> 
>>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com 
>>> <mailto:18717838...@163.com>> wrote:
>>> Sagar, flush will be called when do a checkpoint. Please see 
>>> 
>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>> 
>>> 
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext context) throws Exception 
>>> {
>>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
>>> been properly initialized.");
>>> 
>>>    restoredBucketStates.clear();
>>> 
>>>    synchronized (state.bucketStates) {
>>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>> 
>>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
>>> state.bucketStates.entrySet()) {
>>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>>> 
>>>          if (bucketState.isWriterOpen) {
>>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>>          }
>>> 
>>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>>             
>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
>>> bucketState.pendingFiles);
>>>          }
>>>          bucketState.pendingFiles = new ArrayList<>();
>>>       }
>>>       restoredBucketStates.add(state);
>>> 
>>>       if (LOG.isDebugEnabled()) {
>>>          LOG.debug("{} idx {} checkpointed {}.", 
>>> getClass().getSimpleName(), subtaskIdx, state);
>>>       }
>>>    }
>>> 
>>> 
>>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com 
>>>> <mailto:sagar...@gmail.com>> 写道:
>>>> 
>>>> Thanks for replying. 
>>>> 
>>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 
>>>> 
>>>> env.enablecheckpointing(100);
>>>> 
>>>> But in all the cases, I still see .pending state. 
>>>> 
>>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which 
>>>> might not be getting called somehow ?
>>>> 
>>>> Thanks,
>>>> Sagar
>>>> 
>>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com 
>>>> <mailto:18717838...@163.com>> wrote:
>>>> Hi,Sagar
>>>> 
>>>> Please take a look at BucketingSink, It says that a file would keep 
>>>> .pending status if you DO NOT do a checkpoint. Doc says,  when a 
>>>> checkpoint is successful the currently pending file will be removed to 
>>>> {@code finished}. 
>>>> Take a try again. I think you should call the below method and see what 
>>>> would happen on it. Anyway, I will also try that and see whether it works. 
>>>> Please let me know if you still meet error.
>>>> 
>>>>  env.enableCheckpointing(200);
>>>> 
>>>> /**
>>>>  * The suffix for {@code pending} part files. These are closed files that 
>>>> we are
>>>>  * not currently writing to (inactive or reached {@link #batchSize}), but 
>>>> which
>>>>  * were not yet confirmed by a checkpoint.
>>>>  */
>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code 
>>>> pending} or {@code finished}.
>>>> * The reason for this is how the sink works together with the 
>>>> checkpointing mechanism to provide exactly-once
>>>> * semantics and fault-tolerance. The part file that is currently being 
>>>> written to is {@code in-progress}. Once
>>>> * a part file is closed for writing it becomes {@code pending}. When a 
>>>> checkpoint is successful the currently
>>>> * pending files will be moved to {@code finished}.
>>>> 
>>>> Cheers
>>>> Zhangminglei
>>>> 
>>>> 
>>>> 
>>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com 
>>>>> <mailto:sagar...@gmail.com>> 写道:
>>>>> 
>>>>> Thanks Zhangminglei for quick response.
>>>>> 
>>>>> I tried the above code and I am seeing another issue where the files 
>>>>> created on hdfs are always in .pending state.
>>>>> 
>>>>> Let me know if you can reproduce it ?
>>>>> 
>>>>> Thanks,
>>>>> Sagar
>>>>> 
>>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com 
>>>>> <mailto:18717838...@163.com>> wrote:
>>>>> Hi, Sagar
>>>>> 
>>>>> I did a local test for that and it seems works fine for me. PR will be 
>>>>> updated for [FLINK-9407] 
>>>>> 
>>>>> I will update the newest code to PR soon and below is the example I was 
>>>>> using for my test. You can check it again. Hopes you can enjoy it!
>>>>> 
>>>>> Cheers
>>>>> Zhangminglei.
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>> import org.apache.flink.types.Row;
>>>>> 
>>>>> public class TestOrc {
>>>>>    public static void main(String[] args) throws Exception {
>>>>>       StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>       env.setParallelism(1);
>>>>> 
>>>>>       String orcSchemaString = 
>>>>> "struct<name:string,age:int,married:boolean>";
>>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man <>";
>>>>> 
>>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>> 
>>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>> 
>>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>> 
>>>>>       dataStream.addSink(bucketingSink);
>>>>> 
>>>>>       env.execute();
>>>>>    }
>>>>> 
>>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>>> 
>>>>>       @Override
>>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>>          for (int i = 0; i < 3; i++) {
>>>>>             Row row = new Row(3);
>>>>>             row.setField(0, "Sagar");
>>>>>             row.setField(1, 26 + i);
>>>>>             row.setField(2, false);
>>>>>             ctx.collect(row);
>>>>>          }
>>>>>       }
>>>>> 
>>>>>       @Override
>>>>>       public void cancel() {
>>>>> 
>>>>>       }
>>>>>    }
>>>>> }
>>>>> 
>>>>> 
>>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com 
>>>>>> <mailto:sagar...@gmail.com>> 写道:
>>>>>> 
>>>>>> Hi Zhangminglei,
>>>>>> 
>>>>>> Question about  https://issues.apache.org/jira/browse/FLINK-9407 
>>>>>> <https://issues.apache.org/jira/browse/FLINK-9407> 
>>>>>> 
>>>>>> I tried to use the code from PR and run it on local hdfs cluster to 
>>>>>> write some ORC data.
>>>>>> 
>>>>>> But somehow this code is failing with following error:
>>>>>> 
>>>>>>  
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>>>>>>  Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in 
>>>>>> <http://part-0-0.in/>-progress for DFSClient_NONMAPREDUCE_73219864_36 on 
>>>>>> 127.0.0.1 because this file lease is currently owned by 
>>>>>> DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
>>>>>>  at 
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)
>>>>>> 
>>>>>> I understand that this error is related to Hadoop but somehow I get this 
>>>>>> error only when executing the code from this PR.
>>>>>> 
>>>>>> I had created very crude way to write ORC file to HDFS as per follows. 
>>>>>> Below code works alright and does not throw above error.
>>>>>> 
>>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>> import org.apache.orc.OrcFile;
>>>>>> import org.apache.orc.TypeDescription;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>> 
>>>>>> import java.io.IOException;
>>>>>> 
>>>>>> public class FlinkOrcWriterV1<T> implements 
>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>>> 
>>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>>     String schema;
>>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>>     String basePath;
>>>>>> 
>>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>>         this.schema = schema;
>>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>         Configuration conf = new Configuration();
>>>>>>         orcWriter = OrcFile.createWriter(new 
>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/ <>"),
>>>>>>                     OrcFile.writerOptions(conf)
>>>>>>                         .setSchema(typeDescriptionschema));
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public long flush() throws IOException {
>>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public long getPos() throws IOException {
>>>>>>         return orcWriter.getRawDataSize();
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public void close() throws IOException {
>>>>>>         orcWriter.close();
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public void write(T element) throws IOException {
>>>>>>         VectorizedRowBatch batch = 
>>>>>> typeDescriptionschema.createRowBatch(10);
>>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>>         for(int r=0; r < 10; ++r) {
>>>>>>             int row = batch.size++;
>>>>>>             x.vector[row] = r;
>>>>>>             y.vector[row] = r * 3;
>>>>>>             // If the batch is full, write it out and start over.
>>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>>                 orcWriter.addRowBatch(batch);
>>>>>>                 batch.reset();
>>>>>>             }
>>>>>>         }
>>>>>>         if (batch.size != 0) {
>>>>>>             orcWriter.addRowBatch(batch);
>>>>>>             batch.reset();
>>>>>>         }
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>>     }
>>>>>> }
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Not sure, if the error is related to any of the hadoop dependencies or 
>>>>>> something else ?
>>>>>> 
>>>>>> Can you please look into it and let me know if you can reproduce it on 
>>>>>> your end too ?
>>>>>> 
>>>>>> By the way, following are my dependencies in my project: 
>>>>>> 
>>>>>> dependencies {
>>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
>>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version: 
>>>>>> '1.1.0'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', 
>>>>>> version: '1.4.2'
>>>>>>     compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version: 
>>>>>> '1.4.2'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11', 
>>>>>> version: '1.4.2'
>>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro', version: 
>>>>>> '1.10.0'
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common', 
>>>>>> version: '1.10.0'
>>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: 
>>>>>> '1.4.2'
>>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>> }
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Thanks,
>>>>>> Sagar.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Regards,
>>>>> SAGAR.
>>>> 
>>>> -- 
>>>> Cheers,
>>>> Sagar
>>> 
>>> -- 
>>> Cheers,
>>> Sagar
>> 
>> 
>> 
>> 
>> -- 
>> Regards,
>> SAGAR.
> 

Reply via email to