Thanks @zhangminglei and @Fabian for confirming. Even I looked at the ORC parsing code and it seems that using <struct> type is mandatory for now.
Thanks, Sagar On Wed, Jun 27, 2018 at 12:59 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Sagar, > > That's more a question for the ORC community, but AFAIK, the top-level > type is always a struct because it needs to wrap the fields, e.g., > struct(name:string, age:int) > > Best, Fabian > > 2018-06-26 22:38 GMT+02:00 sagar loke <sagar...@gmail.com>: > >> @zhangminglei, >> >> Question about the schema for ORC format: >> >> 1. Does it always need to be of complex type "<Struct>" ? >> >> 2. Or can it be created with individual data types directly ? >> eg. "name:string, age:int" ? >> >> >> Thanks, >> Sagar >> >> On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com> >> wrote: >> >>> Yes, it should be exit. Thanks to Ted Yu. Very exactly! >>> >>> Cheers >>> Zhangminglei >>> >>> 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道: >>> >>> For #1, the word exist should be exit, right ? >>> Thanks >>> >>> -------- Original message -------- >>> From: zhangminglei <18717838...@163.com> >>> Date: 6/23/18 10:12 AM (GMT+08:00) >>> To: sagar loke <sagar...@gmail.com> >>> Cc: dev <dev@flink.apache.org>, user <u...@flink.apache.org> >>> Subject: Re: [Flink-9407] Question about proposed ORC Sink ! >>> >>> Hi, Sagar. >>> >>> 1. It solves the issue partially meaning files which have finished >>> checkpointing don't show .pending status but the files which were in >>> progress >>> when the program exists are still in .pending state. >>> >>> >>> Ans: >>> >>> Yea, Make the program exists and in that time if a checkpoint does not >>> finished will lead the status keeps in .pending state then. Under the >>> normal circumstances, the programs that running in the production env will >>> never be stoped or existed if everything is fine. >>> >>> 2. Ideally, writer should work with default settings correct ? Meaning >>> we don't have to explicitly set these parameters to make it work. >>> Is this assumption correct ? >>> >>> >>> Ans: >>> >>> Yes. Writer should work with default settings correct. >>> Yes. We do not have to explicitly set these parameters to make it work. >>> Yes. Assumption correct indeed. >>> >>> However, you know, flink is a real time streaming framework, so under >>> normal circumstances,you don't really go to use the default settings when >>> it comes to a specific business. Especially together work with *offline >>> end*(Like hadoop mapreduce). In this case, you need to tell the offline >>> end when time a bucket is close and when time the data for the specify >>> bucket is ready. So, you can take a look on https://issues.apache.org/j >>> ira/browse/FLINK-9609. >>> >>> Cheers >>> Zhangminglei >>> >>> >>> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道: >>> >>> Hi Zhangminglei, >>> >>> Thanks for the reply. >>> >>> 1. It solves the issue partially meaning files which have finished >>> checkpointing don't show .pending status but the files which were in >>> progress >>> when the program exists are still in .pending state. >>> >>> 2. Ideally, writer should work with default settings correct ? Meaning >>> we don't have to explicitly set these parameters to make it work. >>> Is this assumption correct ? >>> >>> Thanks, >>> Sagar >>> >>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> >>> wrote: >>> >>>> Hi, Sagar. Please use the below code and you will find the part files >>>> status from _part-0-107.in-progress to _part-0-107.pending and >>>> finally to part-0-107. [For example], you need to run the program for a >>>> while. However, we need set some parameters, like the following. Moreover, >>>> *enableCheckpointing* IS also needed. I know why you always see the >>>> *.pending* file since the below parameters default value is 60 seconds >>>> even though you set the enableCheckpoint. So, that is why you can not see >>>> the finished file status until 60 seconds passed. >>>> >>>> Attached is the ending on my end, and you will see what you want! >>>> >>>> Please let me know if you still have the problem. >>>> >>>> Cheers >>>> Zhangminglei >>>> >>>> setInactiveBucketCheckInterval(2000) >>>> .setInactiveBucketThreshold(2000); >>>> >>>> >>>> public class TestOrc { >>>> public static void main(String[] args) throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.setParallelism(1); >>>> env.enableCheckpointing(1000); >>>> env.setStateBackend(new MemoryStateBackend()); >>>> >>>> String orcSchemaString = >>>> "struct<name:string,age:int,married:boolean>"; >>>> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >>>> >>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>>> >>>> bucketingSink >>>> .setWriter(new OrcFileWriter<>(orcSchemaString)) >>>> .setInactiveBucketCheckInterval(2000) >>>> .setInactiveBucketThreshold(2000); >>>> >>>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>>> >>>> dataStream.addSink(bucketingSink); >>>> >>>> env.execute(); >>>> } >>>> >>>> public static class ManGenerator implements SourceFunction<Row> { >>>> >>>> @Override >>>> public void run(SourceContext<Row> ctx) throws Exception { >>>> for (int i = 0; i < 2147483000; i++) { >>>> Row row = new Row(3); >>>> row.setField(0, "Sagar"); >>>> row.setField(1, 26 + i); >>>> row.setField(2, false); >>>> ctx.collect(row); >>>> } >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> >>>> } >>>> } >>>> } >>>> >>>> <filestatus.jpg> >>>> >>>> >>>> >>>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com> 写道: >>>> >>>> Sure, we can solve it together :) >>>> >>>> Are you able to reproduce it ? >>>> >>>> Thanks, >>>> Sagar >>>> >>>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> >>>> wrote: >>>> >>>>> Sagar, flush will be called when do a checkpoint. Please see >>>>> >>>>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>>>> >>>>> >>>>> >>>>> @Override >>>>> public void snapshotState(FunctionSnapshotContext context) throws >>>>> Exception { >>>>> Preconditions.checkNotNull(restoredBucketStates, "The operator has not >>>>> been properly initialized."); >>>>> >>>>> restoredBucketStates.clear(); >>>>> >>>>> synchronized (state.bucketStates) { >>>>> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); >>>>> >>>>> for (Map.Entry<String, BucketState<T>> bucketStateEntry : >>>>> state.bucketStates.entrySet()) { >>>>> BucketState<T> bucketState = bucketStateEntry.getValue(); >>>>> >>>>> if (bucketState.isWriterOpen) { >>>>> bucketState.currentFileValidLength = >>>>> bucketState.writer.flush(); >>>>> } >>>>> >>>>> synchronized (bucketState.pendingFilesPerCheckpoint) { >>>>> >>>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), >>>>> bucketState.pendingFiles); >>>>> } >>>>> bucketState.pendingFiles = new ArrayList<>(); >>>>> } >>>>> restoredBucketStates.add(state); >>>>> >>>>> if (LOG.isDebugEnabled()) { >>>>> LOG.debug("{} idx {} checkpointed {}.", >>>>> getClass().getSimpleName(), subtaskIdx, state); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com> 写道: >>>>> >>>>> Thanks for replying. >>>>> >>>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. >>>>> >>>>> env.enablecheckpointing(100); >>>>> >>>>> But in all the cases, I still see .pending state. >>>>> >>>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which >>>>> might not be getting called somehow ? >>>>> >>>>> Thanks, >>>>> Sagar >>>>> >>>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com> >>>>> wrote: >>>>> >>>>>> Hi,Sagar >>>>>> >>>>>> Please take a look at BucketingSink, It says that a file would keep >>>>>> .pending status if you DO NOT do a checkpoint. Doc says, when a >>>>>> checkpoint >>>>>> is successful the currently pending file will be removed to {@code >>>>>> finished}. >>>>>> Take a try again. I think you should call the below method and see >>>>>> what would happen on it. Anyway, I will also try that and see whether it >>>>>> works. Please let me know if you still meet error. >>>>>> >>>>>> env.enableCheckpointing(200); >>>>>> >>>>>> /** >>>>>> * The suffix for {@code pending} part files. These are closed files >>>>>> that we are >>>>>> * not currently writing to (inactive or reached {@link #batchSize}), >>>>>> but which >>>>>> * were not yet confirmed by a checkpoint. >>>>>> */ >>>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending"; >>>>>> >>>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code >>>>>> pending} or {@code finished}. >>>>>> * The reason for this is how the sink works together with the >>>>>> checkpointing mechanism to provide exactly-once >>>>>> * semantics and fault-tolerance. The part file that is currently being >>>>>> written to is {@code in-progress}. Once >>>>>> * a part file is closed for writing it becomes {@code pending}. When a >>>>>> checkpoint is successful the currently >>>>>> * pending files will be moved to {@code finished}. >>>>>> >>>>>> >>>>>> Cheers >>>>>> Zhangminglei >>>>>> >>>>>> >>>>>> >>>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com> 写道: >>>>>> >>>>>> Thanks Zhangminglei for quick response. >>>>>> >>>>>> I tried the above code and I am seeing another issue where the files >>>>>> created on hdfs are always in *.pending* state. >>>>>> >>>>>> Let me know if you can reproduce it ? >>>>>> >>>>>> Thanks, >>>>>> Sagar >>>>>> >>>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, Sagar >>>>>>> >>>>>>> I did a local test for that and it seems works fine for me. PR will >>>>>>> be updated for [FLINK-9407] >>>>>>> >>>>>>> I will update the newest code to PR soon and below is the example I >>>>>>> was using for my test. You can check it again. Hopes you can enjoy it! >>>>>>> >>>>>>> Cheers >>>>>>> Zhangminglei. >>>>>>> >>>>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>>>> import >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; >>>>>>> import org.apache.flink.types.Row; >>>>>>> >>>>>>> public class TestOrc { >>>>>>> public static void main(String[] args) throws Exception { >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> env.setParallelism(1); >>>>>>> >>>>>>> String orcSchemaString = >>>>>>> "struct<name:string,age:int,married:boolean>"; >>>>>>> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >>>>>>> >>>>>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>>>>>> >>>>>>> bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString)); >>>>>>> >>>>>>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>>>>>> >>>>>>> dataStream.addSink(bucketingSink); >>>>>>> >>>>>>> env.execute(); >>>>>>> } >>>>>>> >>>>>>> public static class ManGenerator implements SourceFunction<Row> { >>>>>>> >>>>>>> @Override >>>>>>> public void run(SourceContext<Row> ctx) throws Exception { >>>>>>> for (int i = 0; i < 3; i++) { >>>>>>> Row row = new Row(3); >>>>>>> row.setField(0, "Sagar"); >>>>>>> row.setField(1, 26 + i); >>>>>>> row.setField(2, false); >>>>>>> ctx.collect(row); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void cancel() { >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Zhangminglei, >>>>>>> >>>>>>> Question about https://issues.apache.org/jira/browse/FLINK-9407 >>>>>>> >>>>>>> I tried to use the code from PR and run it on local hdfs cluster to >>>>>>> write some ORC data. >>>>>>> >>>>>>> But somehow this code is failing with following error: >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs >>>>>>>> .protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE >>>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for >>>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file >>>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 >>>>>>>> on 127.0.0.1 >>>>>>> >>>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL >>>>>>>> easeInternal(FSNamesystem.java:2500) >>>>>>> >>>>>>> >>>>>>> I understand that this error is related to Hadoop but somehow I get >>>>>>> this error only when executing the code from this PR. >>>>>>> >>>>>>> I had created very crude way to write ORC file to HDFS as per >>>>>>> follows. Below code works alright and does not throw above error. >>>>>>> >>>>>>> import org.apache.flink.streaming.connectors.fs.Writer; >>>>>>>> import org.apache.hadoop.fs.FileSystem; >>>>>>>> import org.apache.hadoop.fs.Path; >>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; >>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; >>>>>>>> import org.apache.orc.OrcFile; >>>>>>>> import org.apache.orc.TypeDescription; >>>>>>>> import org.apache.hadoop.conf.Configuration; >>>>>>>> >>>>>>>> import java.io.IOException; >>>>>>>> >>>>>>>> public class FlinkOrcWriterV1<T> implements >>>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> { >>>>>>>> >>>>>>>> private transient org.apache.orc.Writer orcWriter; >>>>>>>> String schema; >>>>>>>> TypeDescription typeDescriptionschema;//"struct<x:int,y:int>" >>>>>>>> String basePath; >>>>>>>> >>>>>>>> public FlinkOrcWriterV1(String schema) { >>>>>>>> this.schema = schema; >>>>>>>> this.typeDescriptionschema = >>>>>>>> TypeDescription.fromString(schema); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void open(FileSystem fs, Path path) throws IOException { >>>>>>>> Configuration conf = new Configuration(); >>>>>>>> orcWriter = OrcFile.createWriter(new >>>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/"), >>>>>>>> OrcFile.writerOptions(conf) >>>>>>>> .setSchema(typeDescriptionschema)); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long flush() throws IOException { >>>>>>>> return orcWriter.writeIntermediateFooter(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long getPos() throws IOException { >>>>>>>> return orcWriter.getRawDataSize(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void close() throws IOException { >>>>>>>> orcWriter.close(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void write(T element) throws IOException { >>>>>>>> VectorizedRowBatch batch = >>>>>>>> typeDescriptionschema.createRowBatch(10); >>>>>>>> LongColumnVector x = (LongColumnVector) batch.cols[0]; >>>>>>>> LongColumnVector y = (LongColumnVector) batch.cols[1]; >>>>>>>> for(int r=0; r < 10; ++r) { >>>>>>>> int row = batch.size++; >>>>>>>> x.vector[row] = r; >>>>>>>> y.vector[row] = r * 3; >>>>>>>> // If the batch is full, write it out and start over. >>>>>>>> if (batch.size == batch.getMaxSize()) { >>>>>>>> orcWriter.addRowBatch(batch); >>>>>>>> batch.reset(); >>>>>>>> } >>>>>>>> } >>>>>>>> if (batch.size != 0) { >>>>>>>> orcWriter.addRowBatch(batch); >>>>>>>> batch.reset(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public FlinkOrcWriterV1<T> duplicate() { >>>>>>>> return new FlinkOrcWriterV1<>(schema); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> Not sure, if the error is related to any of the hadoop dependencies >>>>>>> or something else ? >>>>>>> >>>>>>> Can you please look into it and let me know if you can reproduce it >>>>>>> on your end too ? >>>>>>> >>>>>>> By the way, following are my dependencies in my project: >>>>>>> >>>>>>> dependencies { >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-java:1.4.2' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-runtime_2.11:1.4.2' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-connec >>>>>>>>> tor-kafka-0.11_2.11:1.4.2' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-connec >>>>>>>>> tor-elasticsearch5_2.11:1.4.2' >>>>>>>> >>>>>>>> compile 'io.confluent:kafka-avro-serializer:3.3.0' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-avro:1.4.2' >>>>>>>> >>>>>>>> compile group: 'org.apache.kafka', name: 'kafka_2.11', version: >>>>>>>>> '1.1.0' >>>>>>>> >>>>>>>> compile group: 'org.apache.flink', name: >>>>>>>>> 'flink-shaded-hadoop2', version: '1.4.2' >>>>>>>> >>>>>>>> compile 'org.apache.flink:flink-connec >>>>>>>>> tor-filesystem_2.11:1.4.2' >>>>>>>> >>>>>>>> compile group: 'org.apache.flink', name: 'flink-jdbc', version: >>>>>>>>> '1.4.2' >>>>>>>> >>>>>>>> compile group: 'org.apache.flink', name: 'flink-table_2.11', >>>>>>>>> version: '1.4.2' >>>>>>>> >>>>>>>> compile group: 'org.apache.orc', name: 'orc-core', version: >>>>>>>>> '1.5.1' >>>>>>>> >>>>>>>> compile group: 'org.apache.parquet', name: 'parquet-avro', >>>>>>>>> version: '1.10.0' >>>>>>>> >>>>>>>> compile group: 'org.apache.parquet', name: 'parquet-common', >>>>>>>>> version: '1.10.0' >>>>>>>> >>>>>>>> compile group: 'org.apache.flink', name: 'flink-orc_2.11', >>>>>>>>> version: '1.4.2' >>>>>>>> >>>>>>>> testCompile group: 'junit', name: 'junit', version: '4.12' >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks, >>>>>>> Sagar. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Regards, >>>>>> SAGAR. >>>>>> >>>>>> >>>>>> -- >>>>> Cheers, >>>>> Sagar >>>>> >>>>> >>>>> -- >>>> Cheers, >>>> Sagar >>>> >>>> >>>> >>> >>> >>> -- >>> Regards, >>> SAGAR. >>> >>> >>> >>> >> >> >> -- >> Regards, >> SAGAR. >> > > -- Regards, SAGAR.