For #1, the word exist should be exit, right ?Thanks -------- Original message --------From: zhangminglei <18717838...@163.com> Date: 6/23/18 10:12 AM (GMT+08:00) To: sagar loke <sagar...@gmail.com> Cc: dev <d...@flink.apache.org>, user <user@flink.apache.org> Subject: Re: [Flink-9407] Question about proposed ORC Sink ! Hi, Sagar. 1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress when the program exists are still in .pending state. Ans: Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine. 2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. Is this assumption correct ? Ans: Yes. Writer should work with default settings correct.Yes. We do not have to explicitly set these parameters to make it work.Yes. Assumption correct indeed. However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609. CheersZhangminglei
在 2018年6月23日,上午8:23,sagar loke <sagar...@gmail.com> 写道: Hi Zhangminglei, Thanks for the reply. 1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress when the program exists are still in .pending state. 2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. Is this assumption correct ? Thanks,Sagar On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838...@163.com> wrote: Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed. Attached is the ending on my end, and you will see what you want! Please let me know if you still have the problem. CheersZhangminglei setInactiveBucketCheckInterval(2000) .setInactiveBucketThreshold(2000); public class TestOrc { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(1000); env.setStateBackend(new MemoryStateBackend()); String orcSchemaString = "struct<name:string,age:int,married:boolean>"; String path = "hdfs://10.199.196.0:9000/data/hive/man"; BucketingSink<Row> bucketingSink = new BucketingSink<>(path); bucketingSink .setWriter(new OrcFileWriter<>(orcSchemaString)) .setInactiveBucketCheckInterval(2000) .setInactiveBucketThreshold(2000); DataStream<Row> dataStream = env.addSource(new ManGenerator()); dataStream.addSink(bucketingSink); env.execute(); } public static class ManGenerator implements SourceFunction<Row> { @Override public void run(SourceContext<Row> ctx) throws Exception { for (int i = 0; i < 2147483000; i++) { Row row = new Row(3); row.setField(0, "Sagar"); row.setField(1, 26 + i); row.setField(2, false); ctx.collect(row); } } @Override public void cancel() { } } }<filestatus.jpg> 在 2018年6月22日,上午11:14,sagar loke <sagar...@gmail.com> 写道: Sure, we can solve it together :) Are you able to reproduce it ? Thanks,Sagar On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838...@163.com> wrote: Sagar, flush will be called when do a checkpoint. Please see bucketState.currentFileValidLength = bucketState.writer.flush(); @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); restoredBucketStates.clear(); synchronized (state.bucketStates) { int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { BucketState<T> bucketState = bucketStateEntry.getValue(); if (bucketState.isWriterOpen) { bucketState.currentFileValidLength = bucketState.writer.flush(); } synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); } bucketState.pendingFiles = new ArrayList<>(); } restoredBucketStates.add(state); if (LOG.isDebugEnabled()) { LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state); } } 在 2018年6月22日,上午10:21,sagar loke <sagar...@gmail.com> 写道: Thanks for replying. Yes, I tried with different values of checkpoint eg. 20, 100, 5000. env.enablecheckpointing(100); But in all the cases, I still see .pending state. Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ? Thanks,Sagar On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838...@163.com> wrote: Hi,Sagar Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says, when a checkpoint is successful the currently pending file will be removed to {@code finished}. Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error. env.enableCheckpointing(200); /** * The suffix for {@code pending} part files. These are closed files that we are * not currently writing to (inactive or reached {@link #batchSize}), but which * were not yet confirmed by a checkpoint. */ private static final String DEFAULT_PENDING_SUFFIX = ".pending";<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}. * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently * pending files will be moved to {@code finished}. CheersZhangminglei 在 2018年6月22日,上午4:46,sagar loke <sagar...@gmail.com> 写道: Thanks Zhangminglei for quick response. I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state. Let me know if you can reproduce it ? Thanks,Sagar On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838...@163.com> wrote: Hi, Sagar I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it! CheersZhangminglei.import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.types.Row; public class TestOrc { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String orcSchemaString = "struct<name:string,age:int,married:boolean>"; String path = "hdfs://10.199.196.0:9000/data/hive/man"; BucketingSink<Row> bucketingSink = new BucketingSink<>(path); bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString)); DataStream<Row> dataStream = env.addSource(new ManGenerator()); dataStream.addSink(bucketingSink); env.execute(); } public static class ManGenerator implements SourceFunction<Row> { @Override public void run(SourceContext<Row> ctx) throws Exception { for (int i = 0; i < 3; i++) { Row row = new Row(3); row.setField(0, "Sagar"); row.setField(1, 26 + i); row.setField(2, false); ctx.collect(row); } } @Override public void cancel() { } } } 在 2018年6月21日,上午1:47,sagar loke <sagar...@gmail.com> 写道: Hi Zhangminglei, Question about https://issues.apache.org/jira/browse/FLINK-9407 I tried to use the code from PR and run it on local hdfs cluster to write some ORC data. But somehow this code is failing with following error: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500) I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR. I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error. import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.hadoop.conf.Configuration; import java.io.IOException; public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> { private transient org.apache.orc.Writer orcWriter; String schema; TypeDescription typeDescriptionschema;//"struct<x:int,y:int>" String basePath; public FlinkOrcWriterV1(String schema) { this.schema = schema; this.typeDescriptionschema = TypeDescription.fromString(schema); } @Override public void open(FileSystem fs, Path path) throws IOException { Configuration conf = new Configuration(); orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"), OrcFile.writerOptions(conf) .setSchema(typeDescriptionschema)); } @Override public long flush() throws IOException { return orcWriter.writeIntermediateFooter(); } @Override public long getPos() throws IOException { return orcWriter.getRawDataSize(); } @Override public void close() throws IOException { orcWriter.close(); } @Override public void write(T element) throws IOException { VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10); LongColumnVector x = (LongColumnVector) batch.cols[0]; LongColumnVector y = (LongColumnVector) batch.cols[1]; for(int r=0; r < 10; ++r) { int row = batch.size++; x.vector[row] = r; y.vector[row] = r * 3; // If the batch is full, write it out and start over. if (batch.size == batch.getMaxSize()) { orcWriter.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { orcWriter.addRowBatch(batch); batch.reset(); } } @Override public FlinkOrcWriterV1<T> duplicate() { return new FlinkOrcWriterV1<>(schema); } } Not sure, if the error is related to any of the hadoop dependencies or something else ? Can you please look into it and let me know if you can reproduce it on your end too ? By the way, following are my dependencies in my project: dependencies { compile 'org.apache.flink:flink-java:1.4.2' compile 'org.apache.flink:flink-runtime_2.11:1.4.2' compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2' compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2' compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2' compile 'io.confluent:kafka-avro-serializer:3.3.0' compile 'org.apache.flink:flink-avro:1.4.2' compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0' compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2' compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2' compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2' compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2' compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1' compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0' compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0' compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2' testCompile group: 'junit', name: 'junit', version: '4.12'} -- Thanks, Sagar. -- Regards, SAGAR. -- Cheers, Sagar -- Cheers, Sagar -- Regards, SAGAR.