Hi Sagar, That's more a question for the ORC community, but AFAIK, the top-level type is always a struct because it needs to wrap the fields, e.g., struct(name:string, age:int)
Best, Fabian 2018-06-26 22:38 GMT+02:00 sagar loke <sagar...@gmail.com>: > @zhangminglei, > > Question about the schema for ORC format: > > 1. Does it always need to be of complex type "<Struct>" ? > > 2. Or can it be created with individual data types directly ? > eg. "name:string, age:int" ? > > > Thanks, > Sagar > > On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com> > wrote: > >> Yes, it should be exit. Thanks to Ted Yu. Very exactly! >> >> Cheers >> Zhangminglei >> >> 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道: >> >> For #1, the word exist should be exit, right ? >> Thanks >> >> -------- Original message -------- >> From: zhangminglei <18717838...@163.com> >> Date: 6/23/18 10:12 AM (GMT+08:00) >> To: sagar loke <sagar...@gmail.com> >> Cc: dev <dev@flink.apache.org>, user <u...@flink.apache.org> >> Subject: Re: [Flink-9407] Question about proposed ORC Sink ! >> >> Hi, Sagar. >> >> 1. It solves the issue partially meaning files which have finished >> checkpointing don't show .pending status but the files which were in >> progress >> when the program exists are still in .pending state. >> >> >> Ans: >> >> Yea, Make the program exists and in that time if a checkpoint does not >> finished will lead the status keeps in .pending state then. Under the >> normal circumstances, the programs that running in the production env will >> never be stoped or existed if everything is fine. >> >> 2. Ideally, writer should work with default settings correct ? Meaning we >> don't have to explicitly set these parameters to make it work. >> Is this assumption correct ? >> >> >> Ans: >> >> Yes. Writer should work with default settings correct. >> Yes. We do not have to explicitly set these parameters to make it work. >> Yes. Assumption correct indeed. >> >> However, you know, flink is a real time streaming framework, so under >> normal circumstances,you don't really go to use the default settings when >> it comes to a specific business. Especially together work with *offline >> end*(Like hadoop mapreduce). In this case, you need to tell the offline >> end when time a bucket is close and when time the data for the specify >> bucket is ready. So, you can take a look on https://issues.apache.org/j >> ira/browse/FLINK-9609. >> >> Cheers >> Zhangminglei >> >> >> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道: >> >> Hi Zhangminglei, >> >> Thanks for the reply. >> >> 1. It solves the issue partially meaning files which have finished >> checkpointing don't show .pending status but the files which were in >> progress >> when the program exists are still in .pending state. >> >> 2. Ideally, writer should work with default settings correct ? Meaning we >> don't have to explicitly set these parameters to make it work. >> Is this assumption correct ? >> >> Thanks, >> Sagar >> >> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> >> wrote: >> >>> Hi, Sagar. Please use the below code and you will find the part files >>> status from _part-0-107.in-progress to _part-0-107.pending and >>> finally to part-0-107. [For example], you need to run the program for a >>> while. However, we need set some parameters, like the following. Moreover, >>> *enableCheckpointing* IS also needed. I know why you always see the >>> *.pending* file since the below parameters default value is 60 seconds >>> even though you set the enableCheckpoint. So, that is why you can not see >>> the finished file status until 60 seconds passed. >>> >>> Attached is the ending on my end, and you will see what you want! >>> >>> Please let me know if you still have the problem. >>> >>> Cheers >>> Zhangminglei >>> >>> setInactiveBucketCheckInterval(2000) >>> .setInactiveBucketThreshold(2000); >>> >>> >>> public class TestOrc { >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setParallelism(1); >>> env.enableCheckpointing(1000); >>> env.setStateBackend(new MemoryStateBackend()); >>> >>> String orcSchemaString = >>> "struct<name:string,age:int,married:boolean>"; >>> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >>> >>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>> >>> bucketingSink >>> .setWriter(new OrcFileWriter<>(orcSchemaString)) >>> .setInactiveBucketCheckInterval(2000) >>> .setInactiveBucketThreshold(2000); >>> >>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>> >>> dataStream.addSink(bucketingSink); >>> >>> env.execute(); >>> } >>> >>> public static class ManGenerator implements SourceFunction<Row> { >>> >>> @Override >>> public void run(SourceContext<Row> ctx) throws Exception { >>> for (int i = 0; i < 2147483000; i++) { >>> Row row = new Row(3); >>> row.setField(0, "Sagar"); >>> row.setField(1, 26 + i); >>> row.setField(2, false); >>> ctx.collect(row); >>> } >>> } >>> >>> @Override >>> public void cancel() { >>> >>> } >>> } >>> } >>> >>> <filestatus.jpg> >>> >>> >>> >>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com> 写道: >>> >>> Sure, we can solve it together :) >>> >>> Are you able to reproduce it ? >>> >>> Thanks, >>> Sagar >>> >>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> >>> wrote: >>> >>>> Sagar, flush will be called when do a checkpoint. Please see >>>> >>>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>>> >>>> >>>> >>>> @Override >>>> public void snapshotState(FunctionSnapshotContext context) throws >>>> Exception { >>>> Preconditions.checkNotNull(restoredBucketStates, "The operator has not >>>> been properly initialized."); >>>> >>>> restoredBucketStates.clear(); >>>> >>>> synchronized (state.bucketStates) { >>>> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); >>>> >>>> for (Map.Entry<String, BucketState<T>> bucketStateEntry : >>>> state.bucketStates.entrySet()) { >>>> BucketState<T> bucketState = bucketStateEntry.getValue(); >>>> >>>> if (bucketState.isWriterOpen) { >>>> bucketState.currentFileValidLength = >>>> bucketState.writer.flush(); >>>> } >>>> >>>> synchronized (bucketState.pendingFilesPerCheckpoint) { >>>> >>>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), >>>> bucketState.pendingFiles); >>>> } >>>> bucketState.pendingFiles = new ArrayList<>(); >>>> } >>>> restoredBucketStates.add(state); >>>> >>>> if (LOG.isDebugEnabled()) { >>>> LOG.debug("{} idx {} checkpointed {}.", >>>> getClass().getSimpleName(), subtaskIdx, state); >>>> } >>>> } >>>> >>>> >>>> >>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com> 写道: >>>> >>>> Thanks for replying. >>>> >>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. >>>> >>>> env.enablecheckpointing(100); >>>> >>>> But in all the cases, I still see .pending state. >>>> >>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which >>>> might not be getting called somehow ? >>>> >>>> Thanks, >>>> Sagar >>>> >>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com> >>>> wrote: >>>> >>>>> Hi,Sagar >>>>> >>>>> Please take a look at BucketingSink, It says that a file would keep >>>>> .pending status if you DO NOT do a checkpoint. Doc says, when a >>>>> checkpoint >>>>> is successful the currently pending file will be removed to {@code >>>>> finished}. >>>>> Take a try again. I think you should call the below method and see >>>>> what would happen on it. Anyway, I will also try that and see whether it >>>>> works. Please let me know if you still meet error. >>>>> >>>>> env.enableCheckpointing(200); >>>>> >>>>> /** >>>>> * The suffix for {@code pending} part files. These are closed files that >>>>> we are >>>>> * not currently writing to (inactive or reached {@link #batchSize}), but >>>>> which >>>>> * were not yet confirmed by a checkpoint. >>>>> */ >>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending"; >>>>> >>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code >>>>> pending} or {@code finished}. >>>>> * The reason for this is how the sink works together with the >>>>> checkpointing mechanism to provide exactly-once >>>>> * semantics and fault-tolerance. The part file that is currently being >>>>> written to is {@code in-progress}. Once >>>>> * a part file is closed for writing it becomes {@code pending}. When a >>>>> checkpoint is successful the currently >>>>> * pending files will be moved to {@code finished}. >>>>> >>>>> >>>>> Cheers >>>>> Zhangminglei >>>>> >>>>> >>>>> >>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com> 写道: >>>>> >>>>> Thanks Zhangminglei for quick response. >>>>> >>>>> I tried the above code and I am seeing another issue where the files >>>>> created on hdfs are always in *.pending* state. >>>>> >>>>> Let me know if you can reproduce it ? >>>>> >>>>> Thanks, >>>>> Sagar >>>>> >>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com> >>>>> wrote: >>>>> >>>>>> Hi, Sagar >>>>>> >>>>>> I did a local test for that and it seems works fine for me. PR will >>>>>> be updated for [FLINK-9407] >>>>>> >>>>>> I will update the newest code to PR soon and below is the example I >>>>>> was using for my test. You can check it again. Hopes you can enjoy it! >>>>>> >>>>>> Cheers >>>>>> Zhangminglei. >>>>>> >>>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>>> import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; >>>>>> import org.apache.flink.types.Row; >>>>>> >>>>>> public class TestOrc { >>>>>> public static void main(String[] args) throws Exception { >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> env.setParallelism(1); >>>>>> >>>>>> String orcSchemaString = >>>>>> "struct<name:string,age:int,married:boolean>"; >>>>>> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >>>>>> >>>>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>>>>> >>>>>> bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString)); >>>>>> >>>>>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>>>>> >>>>>> dataStream.addSink(bucketingSink); >>>>>> >>>>>> env.execute(); >>>>>> } >>>>>> >>>>>> public static class ManGenerator implements SourceFunction<Row> { >>>>>> >>>>>> @Override >>>>>> public void run(SourceContext<Row> ctx) throws Exception { >>>>>> for (int i = 0; i < 3; i++) { >>>>>> Row row = new Row(3); >>>>>> row.setField(0, "Sagar"); >>>>>> row.setField(1, 26 + i); >>>>>> row.setField(2, false); >>>>>> ctx.collect(row); >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void cancel() { >>>>>> >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com> 写道: >>>>>> >>>>>> Hi Zhangminglei, >>>>>> >>>>>> Question about https://issues.apache.org/jira/browse/FLINK-9407 >>>>>> >>>>>> I tried to use the code from PR and run it on local hdfs cluster to >>>>>> write some ORC data. >>>>>> >>>>>> But somehow this code is failing with following error: >>>>>> >>>>>> >>>>>> >>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs >>>>>>> .protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE >>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for >>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file >>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 >>>>>>> on 127.0.0.1 >>>>>> >>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL >>>>>>> easeInternal(FSNamesystem.java:2500) >>>>>> >>>>>> >>>>>> I understand that this error is related to Hadoop but somehow I get >>>>>> this error only when executing the code from this PR. >>>>>> >>>>>> I had created very crude way to write ORC file to HDFS as per >>>>>> follows. Below code works alright and does not throw above error. >>>>>> >>>>>> import org.apache.flink.streaming.connectors.fs.Writer; >>>>>>> import org.apache.hadoop.fs.FileSystem; >>>>>>> import org.apache.hadoop.fs.Path; >>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; >>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; >>>>>>> import org.apache.orc.OrcFile; >>>>>>> import org.apache.orc.TypeDescription; >>>>>>> import org.apache.hadoop.conf.Configuration; >>>>>>> >>>>>>> import java.io.IOException; >>>>>>> >>>>>>> public class FlinkOrcWriterV1<T> implements >>>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> { >>>>>>> >>>>>>> private transient org.apache.orc.Writer orcWriter; >>>>>>> String schema; >>>>>>> TypeDescription typeDescriptionschema;//"struct<x:int,y:int>" >>>>>>> String basePath; >>>>>>> >>>>>>> public FlinkOrcWriterV1(String schema) { >>>>>>> this.schema = schema; >>>>>>> this.typeDescriptionschema = TypeDescription.fromString(schema); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void open(FileSystem fs, Path path) throws IOException { >>>>>>> Configuration conf = new Configuration(); >>>>>>> orcWriter = OrcFile.createWriter(new >>>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/"), >>>>>>> OrcFile.writerOptions(conf) >>>>>>> .setSchema(typeDescriptionschema)); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long flush() throws IOException { >>>>>>> return orcWriter.writeIntermediateFooter(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long getPos() throws IOException { >>>>>>> return orcWriter.getRawDataSize(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void close() throws IOException { >>>>>>> orcWriter.close(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void write(T element) throws IOException { >>>>>>> VectorizedRowBatch batch = >>>>>>> typeDescriptionschema.createRowBatch(10); >>>>>>> LongColumnVector x = (LongColumnVector) batch.cols[0]; >>>>>>> LongColumnVector y = (LongColumnVector) batch.cols[1]; >>>>>>> for(int r=0; r < 10; ++r) { >>>>>>> int row = batch.size++; >>>>>>> x.vector[row] = r; >>>>>>> y.vector[row] = r * 3; >>>>>>> // If the batch is full, write it out and start over. >>>>>>> if (batch.size == batch.getMaxSize()) { >>>>>>> orcWriter.addRowBatch(batch); >>>>>>> batch.reset(); >>>>>>> } >>>>>>> } >>>>>>> if (batch.size != 0) { >>>>>>> orcWriter.addRowBatch(batch); >>>>>>> batch.reset(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public FlinkOrcWriterV1<T> duplicate() { >>>>>>> return new FlinkOrcWriterV1<>(schema); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> Not sure, if the error is related to any of the hadoop dependencies >>>>>> or something else ? >>>>>> >>>>>> Can you please look into it and let me know if you can reproduce it >>>>>> on your end too ? >>>>>> >>>>>> By the way, following are my dependencies in my project: >>>>>> >>>>>> dependencies { >>>>>>> >>>>>>> compile 'org.apache.flink:flink-java:1.4.2' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-runtime_2.11:1.4.2' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-connec >>>>>>>> tor-elasticsearch5_2.11:1.4.2' >>>>>>> >>>>>>> compile 'io.confluent:kafka-avro-serializer:3.3.0' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-avro:1.4.2' >>>>>>> >>>>>>> compile group: 'org.apache.kafka', name: 'kafka_2.11', version: >>>>>>>> '1.1.0' >>>>>>> >>>>>>> compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', >>>>>>>> version: '1.4.2' >>>>>>> >>>>>>> compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2' >>>>>>> >>>>>>> compile group: 'org.apache.flink', name: 'flink-jdbc', version: >>>>>>>> '1.4.2' >>>>>>> >>>>>>> compile group: 'org.apache.flink', name: 'flink-table_2.11', >>>>>>>> version: '1.4.2' >>>>>>> >>>>>>> compile group: 'org.apache.orc', name: 'orc-core', version: >>>>>>>> '1.5.1' >>>>>>> >>>>>>> compile group: 'org.apache.parquet', name: 'parquet-avro', >>>>>>>> version: '1.10.0' >>>>>>> >>>>>>> compile group: 'org.apache.parquet', name: 'parquet-common', >>>>>>>> version: '1.10.0' >>>>>>> >>>>>>> compile group: 'org.apache.flink', name: 'flink-orc_2.11', >>>>>>>> version: '1.4.2' >>>>>>> >>>>>>> testCompile group: 'junit', name: 'junit', version: '4.12' >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Sagar. >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> SAGAR. >>>>> >>>>> >>>>> -- >>>> Cheers, >>>> Sagar >>>> >>>> >>>> -- >>> Cheers, >>> Sagar >>> >>> >>> >> >> >> -- >> Regards, >> SAGAR. >> >> >> >> > > > -- > Regards, > SAGAR. >