@zhangminglei,

Question about the schema for ORC format:

1. Does it always need to be of complex type "<Struct>" ?

2. Or can it be created with individual data types directly ?
    eg. "name:string, age:int" ?


Thanks,
Sagar

On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com> wrote:

> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>
> Cheers
> Zhangminglei
>
> 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道:
>
> For #1, the word exist should be exit, right ?
> Thanks
>
> -------- Original message --------
> From: zhangminglei <18717838...@163.com>
> Date: 6/23/18 10:12 AM (GMT+08:00)
> To: sagar loke <sagar...@gmail.com>
> Cc: dev <d...@flink.apache.org>, user <user@flink.apache.org>
> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>
> Hi, Sagar.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
>     when the program exists are still in .pending state.
>
>
> Ans:
>
> Yea, Make the program exists and in that time if a checkpoint does not
> finished will lead the status keeps in .pending state then. Under the
> normal circumstances, the programs that running in the production env will
> never be stoped or existed if everything is fine.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
>     Is this assumption correct ?
>
>
> Ans:
>
> Yes. Writer should work with default settings correct.
> Yes. We do not have to explicitly set these parameters to make it work.
> Yes. Assumption correct indeed.
>
> However, you know, flink is a real time streaming framework, so under
> normal circumstances,you don't really go to use the default settings when
> it comes to a specific business. Especially together work with *offline
> end*(Like hadoop mapreduce). In this case, you need to tell the offline
> end when time a bucket is close and when time the data for the specify
> bucket is ready. So, you can take a look on https://issues.apache.org/
> jira/browse/FLINK-9609.
>
> Cheers
> Zhangminglei
>
>
> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道:
>
> Hi Zhangminglei,
>
> Thanks for the reply.
>
> 1. It solves the issue partially meaning files which have finished
> checkpointing don't show .pending status but the files which were in
> progress
>     when the program exists are still in .pending state.
>
> 2. Ideally, writer should work with default settings correct ? Meaning we
> don't have to explicitly set these parameters to make it work.
>     Is this assumption correct ?
>
> Thanks,
> Sagar
>
> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> wrote:
>
>> Hi, Sagar. Please use the below code and you will find the part files
>> status from _part-0-107.in-progress   to _part-0-107.pending and finally
>> to part-0-107. [For example], you need to run the program for a while.
>> However, we need set some parameters, like the following. Moreover,
>> *enableCheckpointing* IS also needed. I know why you always see the
>> *.pending* file since the below parameters default value is 60 seconds
>> even though you set the enableCheckpoint. So, that is why you can not see
>> the finished file status until 60 seconds passed.
>>
>> Attached is the ending on my end, and you will see what you want!
>>
>> Please let me know if you still have the problem.
>>
>> Cheers
>> Zhangminglei
>>
>> setInactiveBucketCheckInterval(2000)
>> .setInactiveBucketThreshold(2000);
>>
>>
>> public class TestOrc {
>>    public static void main(String[] args) throws Exception {
>>       StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>       env.setParallelism(1);
>>       env.enableCheckpointing(1000);
>>       env.setStateBackend(new MemoryStateBackend());
>>
>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>
>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>
>>       bucketingSink
>>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>>          .setInactiveBucketCheckInterval(2000)
>>          .setInactiveBucketThreshold(2000);
>>
>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>
>>       dataStream.addSink(bucketingSink);
>>
>>       env.execute();
>>    }
>>
>>    public static class ManGenerator implements SourceFunction<Row> {
>>
>>       @Override
>>       public void run(SourceContext<Row> ctx) throws Exception {
>>          for (int i = 0; i < 2147483000; i++) {
>>             Row row = new Row(3);
>>             row.setField(0, "Sagar");
>>             row.setField(1, 26 + i);
>>             row.setField(2, false);
>>             ctx.collect(row);
>>          }
>>       }
>>
>>       @Override
>>       public void cancel() {
>>
>>       }
>>    }
>> }
>>
>> <filestatus.jpg>
>>
>>
>>
>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com> 写道:
>>
>> Sure, we can solve it together :)
>>
>> Are you able to reproduce it ?
>>
>> Thanks,
>> Sagar
>>
>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> wrote:
>>
>>> Sagar, flush will be called when do a checkpoint. Please see
>>>
>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>>
>>>
>>>
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext context) throws Exception 
>>> {
>>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
>>> been properly initialized.");
>>>
>>>    restoredBucketStates.clear();
>>>
>>>    synchronized (state.bucketStates) {
>>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>>
>>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
>>> state.bucketStates.entrySet()) {
>>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>>>
>>>          if (bucketState.isWriterOpen) {
>>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>>          }
>>>
>>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>>             
>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
>>> bucketState.pendingFiles);
>>>          }
>>>          bucketState.pendingFiles = new ArrayList<>();
>>>       }
>>>       restoredBucketStates.add(state);
>>>
>>>       if (LOG.isDebugEnabled()) {
>>>          LOG.debug("{} idx {} checkpointed {}.", 
>>> getClass().getSimpleName(), subtaskIdx, state);
>>>       }
>>>    }
>>>
>>>
>>>
>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com> 写道:
>>>
>>> Thanks for replying.
>>>
>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000.
>>>
>>> env.enablecheckpointing(100);
>>>
>>> But in all the cases, I still see .pending state.
>>>
>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which
>>> might not be getting called somehow ?
>>>
>>> Thanks,
>>> Sagar
>>>
>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com>
>>> wrote:
>>>
>>>> Hi,Sagar
>>>>
>>>> Please take a look at BucketingSink, It says that a file would keep
>>>> .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint
>>>> is successful the currently pending file will be removed to {@code
>>>> finished}.
>>>> Take a try again. I think you should call the below method and see what
>>>> would happen on it. Anyway, I will also try that and see whether it works.
>>>> Please let me know if you still meet error.
>>>>
>>>>  env.enableCheckpointing(200);
>>>>
>>>> /**
>>>>  * The suffix for {@code pending} part files. These are closed files that 
>>>> we are
>>>>  * not currently writing to (inactive or reached {@link #batchSize}), but 
>>>> which
>>>>  * were not yet confirmed by a checkpoint.
>>>>  */
>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>>
>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code 
>>>> pending} or {@code finished}.
>>>> * The reason for this is how the sink works together with the 
>>>> checkpointing mechanism to provide exactly-once
>>>> * semantics and fault-tolerance. The part file that is currently being 
>>>> written to is {@code in-progress}. Once
>>>> * a part file is closed for writing it becomes {@code pending}. When a 
>>>> checkpoint is successful the currently
>>>> * pending files will be moved to {@code finished}.
>>>>
>>>>
>>>> Cheers
>>>> Zhangminglei
>>>>
>>>>
>>>>
>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com> 写道:
>>>>
>>>> Thanks Zhangminglei for quick response.
>>>>
>>>> I tried the above code and I am seeing another issue where the files
>>>> created on hdfs are always in *.pending* state.
>>>>
>>>> Let me know if you can reproduce it ?
>>>>
>>>> Thanks,
>>>> Sagar
>>>>
>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com>
>>>> wrote:
>>>>
>>>>> Hi, Sagar
>>>>>
>>>>> I did a local test for that and it seems works fine for me. PR will be
>>>>> updated for [FLINK-9407]
>>>>>
>>>>> I will update the newest code to PR soon and below is the example I
>>>>> was using for my test. You can check it again. Hopes you can enjoy it!
>>>>>
>>>>> Cheers
>>>>> Zhangminglei.
>>>>>
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>> import org.apache.flink.types.Row;
>>>>>
>>>>> public class TestOrc {
>>>>>    public static void main(String[] args) throws Exception {
>>>>>       StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>       env.setParallelism(1);
>>>>>
>>>>>       String orcSchemaString = 
>>>>> "struct<name:string,age:int,married:boolean>";
>>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>>>
>>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>>
>>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>>
>>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>>
>>>>>       dataStream.addSink(bucketingSink);
>>>>>
>>>>>       env.execute();
>>>>>    }
>>>>>
>>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>>          for (int i = 0; i < 3; i++) {
>>>>>             Row row = new Row(3);
>>>>>             row.setField(0, "Sagar");
>>>>>             row.setField(1, 26 + i);
>>>>>             row.setField(2, false);
>>>>>             ctx.collect(row);
>>>>>          }
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>    }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com> 写道:
>>>>>
>>>>> Hi Zhangminglei,
>>>>>
>>>>> Question about  https://issues.apache.org/jira/browse/FLINK-9407
>>>>>
>>>>> I tried to use the code from PR and run it on local hdfs cluster to
>>>>> write some ORC data.
>>>>>
>>>>> But somehow this code is failing with following error:
>>>>>
>>>>>
>>>>>
>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
>>>>>> hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE
>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for
>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file
>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on
>>>>>> 127.0.0.1
>>>>>
>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL
>>>>>> easeInternal(FSNamesystem.java:2500)
>>>>>
>>>>>
>>>>> I understand that this error is related to Hadoop but somehow I get
>>>>> this error only when executing the code from this PR.
>>>>>
>>>>> I had created very crude way to write ORC file to HDFS as per follows.
>>>>> Below code works alright and does not throw above error.
>>>>>
>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>> import org.apache.hadoop.fs.Path;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>> import org.apache.orc.OrcFile;
>>>>>> import org.apache.orc.TypeDescription;
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>>
>>>>>> public class FlinkOrcWriterV1<T> implements 
>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>>>
>>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>>     String schema;
>>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>>     String basePath;
>>>>>>
>>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>>         this.schema = schema;
>>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>         Configuration conf = new Configuration();
>>>>>>         orcWriter = OrcFile.createWriter(new 
>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/"),
>>>>>>                     OrcFile.writerOptions(conf)
>>>>>>                         .setSchema(typeDescriptionschema));
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public long flush() throws IOException {
>>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public long getPos() throws IOException {
>>>>>>         return orcWriter.getRawDataSize();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void close() throws IOException {
>>>>>>         orcWriter.close();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void write(T element) throws IOException {
>>>>>>         VectorizedRowBatch batch = 
>>>>>> typeDescriptionschema.createRowBatch(10);
>>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>>         for(int r=0; r < 10; ++r) {
>>>>>>             int row = batch.size++;
>>>>>>             x.vector[row] = r;
>>>>>>             y.vector[row] = r * 3;
>>>>>>             // If the batch is full, write it out and start over.
>>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>>                 orcWriter.addRowBatch(batch);
>>>>>>                 batch.reset();
>>>>>>             }
>>>>>>         }
>>>>>>         if (batch.size != 0) {
>>>>>>             orcWriter.addRowBatch(batch);
>>>>>>             batch.reset();
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> Not sure, if the error is related to any of the hadoop dependencies or
>>>>> something else ?
>>>>>
>>>>> Can you please look into it and let me know if you can reproduce it on
>>>>> your end too ?
>>>>>
>>>>> By the way, following are my dependencies in my project:
>>>>>
>>>>> dependencies {
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-connec
>>>>>>> tor-elasticsearch5_2.11:1.4.2'
>>>>>>
>>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>
>>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version:
>>>>>>> '1.1.0'
>>>>>>
>>>>>>     compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>>     compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
>>>>>>
>>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version:
>>>>>>> '1.4.2'
>>>>>>
>>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version:
>>>>>>> '1.5.1'
>>>>>>
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro',
>>>>>>> version: '1.10.0'
>>>>>>
>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common',
>>>>>>> version: '1.10.0'
>>>>>>
>>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11',
>>>>>>> version: '1.4.2'
>>>>>>
>>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Sagar.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> SAGAR.
>>>>
>>>>
>>>> --
>>> Cheers,
>>> Sagar
>>>
>>>
>>> --
>> Cheers,
>> Sagar
>>
>>
>>
>
>
> --
> Regards,
> SAGAR.
>
>
>
>


-- 
Regards,
SAGAR.

Reply via email to