@zhangminglei, Question about the schema for ORC format:
1. Does it always need to be of complex type "<Struct>" ? 2. Or can it be created with individual data types directly ? eg. "name:string, age:int" ? Thanks, Sagar On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838...@163.com> wrote: > Yes, it should be exit. Thanks to Ted Yu. Very exactly! > > Cheers > Zhangminglei > > 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道: > > For #1, the word exist should be exit, right ? > Thanks > > -------- Original message -------- > From: zhangminglei <18717838...@163.com> > Date: 6/23/18 10:12 AM (GMT+08:00) > To: sagar loke <sagar...@gmail.com> > Cc: dev <d...@flink.apache.org>, user <user@flink.apache.org> > Subject: Re: [Flink-9407] Question about proposed ORC Sink ! > > Hi, Sagar. > > 1. It solves the issue partially meaning files which have finished > checkpointing don't show .pending status but the files which were in > progress > when the program exists are still in .pending state. > > > Ans: > > Yea, Make the program exists and in that time if a checkpoint does not > finished will lead the status keeps in .pending state then. Under the > normal circumstances, the programs that running in the production env will > never be stoped or existed if everything is fine. > > 2. Ideally, writer should work with default settings correct ? Meaning we > don't have to explicitly set these parameters to make it work. > Is this assumption correct ? > > > Ans: > > Yes. Writer should work with default settings correct. > Yes. We do not have to explicitly set these parameters to make it work. > Yes. Assumption correct indeed. > > However, you know, flink is a real time streaming framework, so under > normal circumstances,you don't really go to use the default settings when > it comes to a specific business. Especially together work with *offline > end*(Like hadoop mapreduce). In this case, you need to tell the offline > end when time a bucket is close and when time the data for the specify > bucket is ready. So, you can take a look on https://issues.apache.org/ > jira/browse/FLINK-9609. > > Cheers > Zhangminglei > > > 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道: > > Hi Zhangminglei, > > Thanks for the reply. > > 1. It solves the issue partially meaning files which have finished > checkpointing don't show .pending status but the files which were in > progress > when the program exists are still in .pending state. > > 2. Ideally, writer should work with default settings correct ? Meaning we > don't have to explicitly set these parameters to make it work. > Is this assumption correct ? > > Thanks, > Sagar > > On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> wrote: > >> Hi, Sagar. Please use the below code and you will find the part files >> status from _part-0-107.in-progress to _part-0-107.pending and finally >> to part-0-107. [For example], you need to run the program for a while. >> However, we need set some parameters, like the following. Moreover, >> *enableCheckpointing* IS also needed. I know why you always see the >> *.pending* file since the below parameters default value is 60 seconds >> even though you set the enableCheckpoint. So, that is why you can not see >> the finished file status until 60 seconds passed. >> >> Attached is the ending on my end, and you will see what you want! >> >> Please let me know if you still have the problem. >> >> Cheers >> Zhangminglei >> >> setInactiveBucketCheckInterval(2000) >> .setInactiveBucketThreshold(2000); >> >> >> public class TestOrc { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(1); >> env.enableCheckpointing(1000); >> env.setStateBackend(new MemoryStateBackend()); >> >> String orcSchemaString = "struct<name:string,age:int,married:boolean>"; >> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >> >> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >> >> bucketingSink >> .setWriter(new OrcFileWriter<>(orcSchemaString)) >> .setInactiveBucketCheckInterval(2000) >> .setInactiveBucketThreshold(2000); >> >> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >> >> dataStream.addSink(bucketingSink); >> >> env.execute(); >> } >> >> public static class ManGenerator implements SourceFunction<Row> { >> >> @Override >> public void run(SourceContext<Row> ctx) throws Exception { >> for (int i = 0; i < 2147483000; i++) { >> Row row = new Row(3); >> row.setField(0, "Sagar"); >> row.setField(1, 26 + i); >> row.setField(2, false); >> ctx.collect(row); >> } >> } >> >> @Override >> public void cancel() { >> >> } >> } >> } >> >> <filestatus.jpg> >> >> >> >> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com> 写道: >> >> Sure, we can solve it together :) >> >> Are you able to reproduce it ? >> >> Thanks, >> Sagar >> >> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> wrote: >> >>> Sagar, flush will be called when do a checkpoint. Please see >>> >>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>> >>> >>> >>> @Override >>> public void snapshotState(FunctionSnapshotContext context) throws Exception >>> { >>> Preconditions.checkNotNull(restoredBucketStates, "The operator has not >>> been properly initialized."); >>> >>> restoredBucketStates.clear(); >>> >>> synchronized (state.bucketStates) { >>> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); >>> >>> for (Map.Entry<String, BucketState<T>> bucketStateEntry : >>> state.bucketStates.entrySet()) { >>> BucketState<T> bucketState = bucketStateEntry.getValue(); >>> >>> if (bucketState.isWriterOpen) { >>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>> } >>> >>> synchronized (bucketState.pendingFilesPerCheckpoint) { >>> >>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), >>> bucketState.pendingFiles); >>> } >>> bucketState.pendingFiles = new ArrayList<>(); >>> } >>> restoredBucketStates.add(state); >>> >>> if (LOG.isDebugEnabled()) { >>> LOG.debug("{} idx {} checkpointed {}.", >>> getClass().getSimpleName(), subtaskIdx, state); >>> } >>> } >>> >>> >>> >>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com> 写道: >>> >>> Thanks for replying. >>> >>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. >>> >>> env.enablecheckpointing(100); >>> >>> But in all the cases, I still see .pending state. >>> >>> Not sure if it’s related to flush() method from OrcFileWriter ? Which >>> might not be getting called somehow ? >>> >>> Thanks, >>> Sagar >>> >>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com> >>> wrote: >>> >>>> Hi,Sagar >>>> >>>> Please take a look at BucketingSink, It says that a file would keep >>>> .pending status if you DO NOT do a checkpoint. Doc says, when a checkpoint >>>> is successful the currently pending file will be removed to {@code >>>> finished}. >>>> Take a try again. I think you should call the below method and see what >>>> would happen on it. Anyway, I will also try that and see whether it works. >>>> Please let me know if you still meet error. >>>> >>>> env.enableCheckpointing(200); >>>> >>>> /** >>>> * The suffix for {@code pending} part files. These are closed files that >>>> we are >>>> * not currently writing to (inactive or reached {@link #batchSize}), but >>>> which >>>> * were not yet confirmed by a checkpoint. >>>> */ >>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending"; >>>> >>>> <p>Part files can be in one of three states: {@code in-progress}, {@code >>>> pending} or {@code finished}. >>>> * The reason for this is how the sink works together with the >>>> checkpointing mechanism to provide exactly-once >>>> * semantics and fault-tolerance. The part file that is currently being >>>> written to is {@code in-progress}. Once >>>> * a part file is closed for writing it becomes {@code pending}. When a >>>> checkpoint is successful the currently >>>> * pending files will be moved to {@code finished}. >>>> >>>> >>>> Cheers >>>> Zhangminglei >>>> >>>> >>>> >>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com> 写道: >>>> >>>> Thanks Zhangminglei for quick response. >>>> >>>> I tried the above code and I am seeing another issue where the files >>>> created on hdfs are always in *.pending* state. >>>> >>>> Let me know if you can reproduce it ? >>>> >>>> Thanks, >>>> Sagar >>>> >>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com> >>>> wrote: >>>> >>>>> Hi, Sagar >>>>> >>>>> I did a local test for that and it seems works fine for me. PR will be >>>>> updated for [FLINK-9407] >>>>> >>>>> I will update the newest code to PR soon and below is the example I >>>>> was using for my test. You can check it again. Hopes you can enjoy it! >>>>> >>>>> Cheers >>>>> Zhangminglei. >>>>> >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; >>>>> import org.apache.flink.types.Row; >>>>> >>>>> public class TestOrc { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.setParallelism(1); >>>>> >>>>> String orcSchemaString = >>>>> "struct<name:string,age:int,married:boolean>"; >>>>> String path = "hdfs://10.199.196.0:9000/data/hive/man"; >>>>> >>>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>>>> >>>>> bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString)); >>>>> >>>>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>>>> >>>>> dataStream.addSink(bucketingSink); >>>>> >>>>> env.execute(); >>>>> } >>>>> >>>>> public static class ManGenerator implements SourceFunction<Row> { >>>>> >>>>> @Override >>>>> public void run(SourceContext<Row> ctx) throws Exception { >>>>> for (int i = 0; i < 3; i++) { >>>>> Row row = new Row(3); >>>>> row.setField(0, "Sagar"); >>>>> row.setField(1, 26 + i); >>>>> row.setField(2, false); >>>>> ctx.collect(row); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public void cancel() { >>>>> >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com> 写道: >>>>> >>>>> Hi Zhangminglei, >>>>> >>>>> Question about https://issues.apache.org/jira/browse/FLINK-9407 >>>>> >>>>> I tried to use the code from PR and run it on local hdfs cluster to >>>>> write some ORC data. >>>>> >>>>> But somehow this code is failing with following error: >>>>> >>>>> >>>>> >>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop. >>>>>> hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE >>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for >>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file >>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on >>>>>> 127.0.0.1 >>>>> >>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL >>>>>> easeInternal(FSNamesystem.java:2500) >>>>> >>>>> >>>>> I understand that this error is related to Hadoop but somehow I get >>>>> this error only when executing the code from this PR. >>>>> >>>>> I had created very crude way to write ORC file to HDFS as per follows. >>>>> Below code works alright and does not throw above error. >>>>> >>>>> import org.apache.flink.streaming.connectors.fs.Writer; >>>>>> import org.apache.hadoop.fs.FileSystem; >>>>>> import org.apache.hadoop.fs.Path; >>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; >>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; >>>>>> import org.apache.orc.OrcFile; >>>>>> import org.apache.orc.TypeDescription; >>>>>> import org.apache.hadoop.conf.Configuration; >>>>>> >>>>>> import java.io.IOException; >>>>>> >>>>>> public class FlinkOrcWriterV1<T> implements >>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> { >>>>>> >>>>>> private transient org.apache.orc.Writer orcWriter; >>>>>> String schema; >>>>>> TypeDescription typeDescriptionschema;//"struct<x:int,y:int>" >>>>>> String basePath; >>>>>> >>>>>> public FlinkOrcWriterV1(String schema) { >>>>>> this.schema = schema; >>>>>> this.typeDescriptionschema = TypeDescription.fromString(schema); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void open(FileSystem fs, Path path) throws IOException { >>>>>> Configuration conf = new Configuration(); >>>>>> orcWriter = OrcFile.createWriter(new >>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/"), >>>>>> OrcFile.writerOptions(conf) >>>>>> .setSchema(typeDescriptionschema)); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long flush() throws IOException { >>>>>> return orcWriter.writeIntermediateFooter(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long getPos() throws IOException { >>>>>> return orcWriter.getRawDataSize(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void close() throws IOException { >>>>>> orcWriter.close(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void write(T element) throws IOException { >>>>>> VectorizedRowBatch batch = >>>>>> typeDescriptionschema.createRowBatch(10); >>>>>> LongColumnVector x = (LongColumnVector) batch.cols[0]; >>>>>> LongColumnVector y = (LongColumnVector) batch.cols[1]; >>>>>> for(int r=0; r < 10; ++r) { >>>>>> int row = batch.size++; >>>>>> x.vector[row] = r; >>>>>> y.vector[row] = r * 3; >>>>>> // If the batch is full, write it out and start over. >>>>>> if (batch.size == batch.getMaxSize()) { >>>>>> orcWriter.addRowBatch(batch); >>>>>> batch.reset(); >>>>>> } >>>>>> } >>>>>> if (batch.size != 0) { >>>>>> orcWriter.addRowBatch(batch); >>>>>> batch.reset(); >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public FlinkOrcWriterV1<T> duplicate() { >>>>>> return new FlinkOrcWriterV1<>(schema); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>> >>>>> Not sure, if the error is related to any of the hadoop dependencies or >>>>> something else ? >>>>> >>>>> Can you please look into it and let me know if you can reproduce it on >>>>> your end too ? >>>>> >>>>> By the way, following are my dependencies in my project: >>>>> >>>>> dependencies { >>>>>> >>>>>> compile 'org.apache.flink:flink-java:1.4.2' >>>>>> >>>>>> compile 'org.apache.flink:flink-runtime_2.11:1.4.2' >>>>>> >>>>>> compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2' >>>>>> >>>>>> compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2' >>>>>> >>>>>> compile 'org.apache.flink:flink-connec >>>>>>> tor-elasticsearch5_2.11:1.4.2' >>>>>> >>>>>> compile 'io.confluent:kafka-avro-serializer:3.3.0' >>>>>> >>>>>> compile 'org.apache.flink:flink-avro:1.4.2' >>>>>> >>>>>> compile group: 'org.apache.kafka', name: 'kafka_2.11', version: >>>>>>> '1.1.0' >>>>>> >>>>>> compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', >>>>>>> version: '1.4.2' >>>>>> >>>>>> compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2' >>>>>> >>>>>> compile group: 'org.apache.flink', name: 'flink-jdbc', version: >>>>>>> '1.4.2' >>>>>> >>>>>> compile group: 'org.apache.flink', name: 'flink-table_2.11', >>>>>>> version: '1.4.2' >>>>>> >>>>>> compile group: 'org.apache.orc', name: 'orc-core', version: >>>>>>> '1.5.1' >>>>>> >>>>>> compile group: 'org.apache.parquet', name: 'parquet-avro', >>>>>>> version: '1.10.0' >>>>>> >>>>>> compile group: 'org.apache.parquet', name: 'parquet-common', >>>>>>> version: '1.10.0' >>>>>> >>>>>> compile group: 'org.apache.flink', name: 'flink-orc_2.11', >>>>>>> version: '1.4.2' >>>>>> >>>>>> testCompile group: 'junit', name: 'junit', version: '4.12' >>>>>> >>>>>> } >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Sagar. >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> SAGAR. >>>> >>>> >>>> -- >>> Cheers, >>> Sagar >>> >>> >>> -- >> Cheers, >> Sagar >> >> >> > > > -- > Regards, > SAGAR. > > > > -- Regards, SAGAR.