Yes, it should be exit. Thanks to Ted Yu. Very exactly! Cheers Zhangminglei
> 在 2018年6月23日,下午12:40,Ted Yu <yuzhih...@gmail.com> 写道: > > For #1, the word exist should be exit, right ? > Thanks > > -------- Original message -------- > From: zhangminglei <18717838...@163.com> > Date: 6/23/18 10:12 AM (GMT+08:00) > To: sagar loke <sagar...@gmail.com> > Cc: dev <dev@flink.apache.org>, user <u...@flink.apache.org> > Subject: Re: [Flink-9407] Question about proposed ORC Sink ! > > Hi, Sagar. > >> 1. It solves the issue partially meaning files which have finished >> checkpointing don't show .pending status but the files which were in >> progress >> when the program exists are still in .pending state. > > Ans: > > Yea, Make the program exists and in that time if a checkpoint does not > finished will lead the status keeps in .pending state then. Under the normal > circumstances, the programs that running in the production env will never be > stoped or existed if everything is fine. > >> 2. Ideally, writer should work with default settings correct ? Meaning we >> don't have to explicitly set these parameters to make it work. >> Is this assumption correct ? > > Ans: > > Yes. Writer should work with default settings correct. > Yes. We do not have to explicitly set these parameters to make it work. > Yes. Assumption correct indeed. > > However, you know, flink is a real time streaming framework, so under normal > circumstances,you don't really go to use the default settings when it comes > to a specific business. Especially together work with offline end(Like hadoop > mapreduce). In this case, you need to tell the offline end when time a bucket > is close and when time the data for the specify bucket is ready. So, you can > take a look on https://issues.apache.org/jira/browse/FLINK-9609 > <https://issues.apache.org/jira/browse/FLINK-9609>. > > Cheers > Zhangminglei > > >> 在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com >> <mailto:sagar...@gmail.com>> 写道: >> >> Hi Zhangminglei, >> >> Thanks for the reply. >> >> 1. It solves the issue partially meaning files which have finished >> checkpointing don't show .pending status but the files which were in >> progress >> when the program exists are still in .pending state. >> >> 2. Ideally, writer should work with default settings correct ? Meaning we >> don't have to explicitly set these parameters to make it work. >> Is this assumption correct ? >> >> Thanks, >> Sagar >> >> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com >> <mailto:18717838...@163.com>> wrote: >> Hi, Sagar. Please use the below code and you will find the part files status >> from _part-0-107.in-progress <> to _part-0-107.pending and finally to >> part-0-107. [For example], you need to run the program for a while. However, >> we need set some parameters, like the following. Moreover, >> enableCheckpointing IS also needed. I know why you always see the .pending >> file since the below parameters default value is 60 seconds even though you >> set the enableCheckpoint. So, that is why you can not see the finished file >> status until 60 seconds passed. >> >> Attached is the ending on my end, and you will see what you want! >> >> Please let me know if you still have the problem. >> >> Cheers >> Zhangminglei >> >> setInactiveBucketCheckInterval(2000) >> .setInactiveBucketThreshold(2000); >> >> public class TestOrc { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(1); >> env.enableCheckpointing(1000); >> env.setStateBackend(new MemoryStateBackend()); >> >> String orcSchemaString = "struct<name:string,age:int,married:boolean>"; >> String path = "hdfs://10.199.196.0:9000/data/hive/man <>"; >> >> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >> >> bucketingSink >> .setWriter(new OrcFileWriter<>(orcSchemaString)) >> .setInactiveBucketCheckInterval(2000) >> .setInactiveBucketThreshold(2000); >> >> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >> >> dataStream.addSink(bucketingSink); >> >> env.execute(); >> } >> >> public static class ManGenerator implements SourceFunction<Row> { >> >> @Override >> public void run(SourceContext<Row> ctx) throws Exception { >> for (int i = 0; i < 2147483000; i++) { >> Row row = new Row(3); >> row.setField(0, "Sagar"); >> row.setField(1, 26 + i); >> row.setField(2, false); >> ctx.collect(row); >> } >> } >> >> @Override >> public void cancel() { >> >> } >> } >> } >> <filestatus.jpg> >> >> >> >>> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com >>> <mailto:sagar...@gmail.com>> 写道: >>> >>> Sure, we can solve it together :) >>> >>> Are you able to reproduce it ? >>> >>> Thanks, >>> Sagar >>> >>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com >>> <mailto:18717838...@163.com>> wrote: >>> Sagar, flush will be called when do a checkpoint. Please see >>> >>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>> >>> >>> @Override >>> public void snapshotState(FunctionSnapshotContext context) throws Exception >>> { >>> Preconditions.checkNotNull(restoredBucketStates, "The operator has not >>> been properly initialized."); >>> >>> restoredBucketStates.clear(); >>> >>> synchronized (state.bucketStates) { >>> int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); >>> >>> for (Map.Entry<String, BucketState<T>> bucketStateEntry : >>> state.bucketStates.entrySet()) { >>> BucketState<T> bucketState = bucketStateEntry.getValue(); >>> >>> if (bucketState.isWriterOpen) { >>> bucketState.currentFileValidLength = bucketState.writer.flush(); >>> } >>> >>> synchronized (bucketState.pendingFilesPerCheckpoint) { >>> >>> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), >>> bucketState.pendingFiles); >>> } >>> bucketState.pendingFiles = new ArrayList<>(); >>> } >>> restoredBucketStates.add(state); >>> >>> if (LOG.isDebugEnabled()) { >>> LOG.debug("{} idx {} checkpointed {}.", >>> getClass().getSimpleName(), subtaskIdx, state); >>> } >>> } >>> >>> >>>> 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com >>>> <mailto:sagar...@gmail.com>> 写道: >>>> >>>> Thanks for replying. >>>> >>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000. >>>> >>>> env.enablecheckpointing(100); >>>> >>>> But in all the cases, I still see .pending state. >>>> >>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which >>>> might not be getting called somehow ? >>>> >>>> Thanks, >>>> Sagar >>>> >>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com >>>> <mailto:18717838...@163.com>> wrote: >>>> Hi,Sagar >>>> >>>> Please take a look at BucketingSink, It says that a file would keep >>>> .pending status if you DO NOT do a checkpoint. Doc says, when a >>>> checkpoint is successful the currently pending file will be removed to >>>> {@code finished}. >>>> Take a try again. I think you should call the below method and see what >>>> would happen on it. Anyway, I will also try that and see whether it works. >>>> Please let me know if you still meet error. >>>> >>>> env.enableCheckpointing(200); >>>> >>>> /** >>>> * The suffix for {@code pending} part files. These are closed files that >>>> we are >>>> * not currently writing to (inactive or reached {@link #batchSize}), but >>>> which >>>> * were not yet confirmed by a checkpoint. >>>> */ >>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending"; >>>> <p>Part files can be in one of three states: {@code in-progress}, {@code >>>> pending} or {@code finished}. >>>> * The reason for this is how the sink works together with the >>>> checkpointing mechanism to provide exactly-once >>>> * semantics and fault-tolerance. The part file that is currently being >>>> written to is {@code in-progress}. Once >>>> * a part file is closed for writing it becomes {@code pending}. When a >>>> checkpoint is successful the currently >>>> * pending files will be moved to {@code finished}. >>>> >>>> Cheers >>>> Zhangminglei >>>> >>>> >>>> >>>>> 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com >>>>> <mailto:sagar...@gmail.com>> 写道: >>>>> >>>>> Thanks Zhangminglei for quick response. >>>>> >>>>> I tried the above code and I am seeing another issue where the files >>>>> created on hdfs are always in .pending state. >>>>> >>>>> Let me know if you can reproduce it ? >>>>> >>>>> Thanks, >>>>> Sagar >>>>> >>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com >>>>> <mailto:18717838...@163.com>> wrote: >>>>> Hi, Sagar >>>>> >>>>> I did a local test for that and it seems works fine for me. PR will be >>>>> updated for [FLINK-9407] >>>>> >>>>> I will update the newest code to PR soon and below is the example I was >>>>> using for my test. You can check it again. Hopes you can enjoy it! >>>>> >>>>> Cheers >>>>> Zhangminglei. >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; >>>>> import org.apache.flink.types.Row; >>>>> >>>>> public class TestOrc { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.setParallelism(1); >>>>> >>>>> String orcSchemaString = >>>>> "struct<name:string,age:int,married:boolean>"; >>>>> String path = "hdfs://10.199.196.0:9000/data/hive/man <>"; >>>>> >>>>> BucketingSink<Row> bucketingSink = new BucketingSink<>(path); >>>>> >>>>> bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString)); >>>>> >>>>> DataStream<Row> dataStream = env.addSource(new ManGenerator()); >>>>> >>>>> dataStream.addSink(bucketingSink); >>>>> >>>>> env.execute(); >>>>> } >>>>> >>>>> public static class ManGenerator implements SourceFunction<Row> { >>>>> >>>>> @Override >>>>> public void run(SourceContext<Row> ctx) throws Exception { >>>>> for (int i = 0; i < 3; i++) { >>>>> Row row = new Row(3); >>>>> row.setField(0, "Sagar"); >>>>> row.setField(1, 26 + i); >>>>> row.setField(2, false); >>>>> ctx.collect(row); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public void cancel() { >>>>> >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com >>>>>> <mailto:sagar...@gmail.com>> 写道: >>>>>> >>>>>> Hi Zhangminglei, >>>>>> >>>>>> Question about https://issues.apache.org/jira/browse/FLINK-9407 >>>>>> <https://issues.apache.org/jira/browse/FLINK-9407> >>>>>> >>>>>> I tried to use the code from PR and run it on local hdfs cluster to >>>>>> write some ORC data. >>>>>> >>>>>> But somehow this code is failing with following error: >>>>>> >>>>>> >>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): >>>>>> Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in >>>>>> <http://part-0-0.in/>-progress for DFSClient_NONMAPREDUCE_73219864_36 on >>>>>> 127.0.0.1 because this file lease is currently owned by >>>>>> DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1 >>>>>> at >>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500) >>>>>> >>>>>> I understand that this error is related to Hadoop but somehow I get this >>>>>> error only when executing the code from this PR. >>>>>> >>>>>> I had created very crude way to write ORC file to HDFS as per follows. >>>>>> Below code works alright and does not throw above error. >>>>>> >>>>>> import org.apache.flink.streaming.connectors.fs.Writer; >>>>>> import org.apache.hadoop.fs.FileSystem; >>>>>> import org.apache.hadoop.fs.Path; >>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; >>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; >>>>>> import org.apache.orc.OrcFile; >>>>>> import org.apache.orc.TypeDescription; >>>>>> import org.apache.hadoop.conf.Configuration; >>>>>> >>>>>> import java.io.IOException; >>>>>> >>>>>> public class FlinkOrcWriterV1<T> implements >>>>>> org.apache.flink.streaming.connectors.fs.Writer<T> { >>>>>> >>>>>> private transient org.apache.orc.Writer orcWriter; >>>>>> String schema; >>>>>> TypeDescription typeDescriptionschema;//"struct<x:int,y:int>" >>>>>> String basePath; >>>>>> >>>>>> public FlinkOrcWriterV1(String schema) { >>>>>> this.schema = schema; >>>>>> this.typeDescriptionschema = TypeDescription.fromString(schema); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void open(FileSystem fs, Path path) throws IOException { >>>>>> Configuration conf = new Configuration(); >>>>>> orcWriter = OrcFile.createWriter(new >>>>>> Path("hdfs://localhost:9000/tmp/hivedata3/ <>"), >>>>>> OrcFile.writerOptions(conf) >>>>>> .setSchema(typeDescriptionschema)); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long flush() throws IOException { >>>>>> return orcWriter.writeIntermediateFooter(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long getPos() throws IOException { >>>>>> return orcWriter.getRawDataSize(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void close() throws IOException { >>>>>> orcWriter.close(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void write(T element) throws IOException { >>>>>> VectorizedRowBatch batch = >>>>>> typeDescriptionschema.createRowBatch(10); >>>>>> LongColumnVector x = (LongColumnVector) batch.cols[0]; >>>>>> LongColumnVector y = (LongColumnVector) batch.cols[1]; >>>>>> for(int r=0; r < 10; ++r) { >>>>>> int row = batch.size++; >>>>>> x.vector[row] = r; >>>>>> y.vector[row] = r * 3; >>>>>> // If the batch is full, write it out and start over. >>>>>> if (batch.size == batch.getMaxSize()) { >>>>>> orcWriter.addRowBatch(batch); >>>>>> batch.reset(); >>>>>> } >>>>>> } >>>>>> if (batch.size != 0) { >>>>>> orcWriter.addRowBatch(batch); >>>>>> batch.reset(); >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public FlinkOrcWriterV1<T> duplicate() { >>>>>> return new FlinkOrcWriterV1<>(schema); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> Not sure, if the error is related to any of the hadoop dependencies or >>>>>> something else ? >>>>>> >>>>>> Can you please look into it and let me know if you can reproduce it on >>>>>> your end too ? >>>>>> >>>>>> By the way, following are my dependencies in my project: >>>>>> >>>>>> dependencies { >>>>>> compile 'org.apache.flink:flink-java:1.4.2' >>>>>> compile 'org.apache.flink:flink-runtime_2.11:1.4.2' >>>>>> compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2' >>>>>> compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2' >>>>>> compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2' >>>>>> compile 'io.confluent:kafka-avro-serializer:3.3.0' >>>>>> compile 'org.apache.flink:flink-avro:1.4.2' >>>>>> compile group: 'org.apache.kafka', name: 'kafka_2.11', version: >>>>>> '1.1.0' >>>>>> compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', >>>>>> version: '1.4.2' >>>>>> compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2' >>>>>> compile group: 'org.apache.flink', name: 'flink-jdbc', version: >>>>>> '1.4.2' >>>>>> compile group: 'org.apache.flink', name: 'flink-table_2.11', >>>>>> version: '1.4.2' >>>>>> compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1' >>>>>> compile group: 'org.apache.parquet', name: 'parquet-avro', version: >>>>>> '1.10.0' >>>>>> compile group: 'org.apache.parquet', name: 'parquet-common', >>>>>> version: '1.10.0' >>>>>> compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: >>>>>> '1.4.2' >>>>>> testCompile group: 'junit', name: 'junit', version: '4.12' >>>>>> } >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Sagar. >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> SAGAR. >>>> >>>> -- >>>> Cheers, >>>> Sagar >>> >>> -- >>> Cheers, >>> Sagar >> >> >> >> >> -- >> Regards, >> SAGAR. >