Hi,

I think there are two different things mixed up in your analysis. The stack 
trace that you provided is caused by a failing checkpoint - in writing, not in 
reading. It seems to fail from a Timeout of your HDFS connection. This close 
method has also nothing to do with the close method in the writer. It is the 
close method of the CheckpointOutputStream. Furthermore, „could not materialize 
checkpoint“ seems to happen in cancel, so if the checkpoint got canceled that 
means this is an effect and not the cause. There should be another exception 
further up in the logs that gives the real reason why the checkpoint was 
canceled.

Nevertheless, the timeout is strange and you should check if your DFS is 
properly configured and running as expected. The reported exception should have 
no direct connection with your ParquetWriter. It is possible that the 
checkpoint was canceled because some problem happened in the ParquetWriter, but 
then we are looking at the wrong stack trace.

As for the pending files, different DFS implementations could have different 
points where flush() is called. I think your implementation also properly 
forward to writer.flush?

Best,
Stefan

> Am 23.08.2017 um 21:05 schrieb Biswajit Das <biswajit...@gmail.com>:
> 
> Hi There ,
> 
> I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two 
> issue 
> 
> first one if write the same file on s3 all the files
> gets committed , however when I write the same on HDFS I see its remains on 
> .pending state , could be related to second problem below 
> 
> Second issue : My custom writer is writing Avro to parquet and writer is 
> something like this extended from BaseStreamWriter 
> 
> 
>   @transient private var writer: ParquetWriter[T] = _
> 
>   override def open(fs: FileSystem, path: Path): Unit = {
>     val conf = new Configuration()
>     conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
>     conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
>     writer = AvroParquetWriter
>       .builder[T](path)
>       .withSchema(new Schema.Parser().parse(schema))
>       .withCompressionCodec(compressionCodecName)
>       .withConf(conf)
>       .build()
>   }
> 
>   override def write(element: T): Unit = writer.write(element)
> 
>   override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](schema)
> 
>   override def close(): Unit = writer.close()
> 
>   override def getPos: Long = writer.getDataSize
> 
>   override def flush(): Long = super.flush()
> 
> 
> What I noticed during recovering from checkpoint it fails to flush , although 
> I have overriden flush ^^ above . The issue seems 
> it doesn't have handle of stream writer that's why it is failing when flush 
> call for stream writer , not sure if first .pedning 
> state is related to this also .
> 
> 
> --------------------------------------------------
> 11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task - 
> Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6) 
> switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 4 for operator Source:- 
> kafka source (1/1).
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>  ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_73]
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_73]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_73]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_73]
>     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
> Caused by: java.lang.Exception: Could not materialize checkpoint 4 for 
> operator Source: eo_open- kafka source (1/1).
>     ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://XXXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
>  in order to obtain the stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> [na:1.8.0_73]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_73]
>     at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-core-1.3.2.jar:1.3.2]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
>  ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>     ... 5 common frames omitted
>     Suppressed: java.lang.Exception: Could not properly cancel managed 
> operator state future.
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
>  ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>  ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>  ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         ... 5 common frames omitted
>     Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xxx:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
>  in order to obtain the stream state handle
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>         at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
>         ... 7 common frames omitted
>     Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://XXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
>  in order to obtain the stream state handle
>         at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>         at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>         at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
>         at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
>         at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:288)
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:521)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
>         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1185)
>         ... 5 common frames omitted
>     Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 millis 
> timeout while waiting for channel to be ready for connect. ch : 
> java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1341)
>         
>         
>         -------------------------------------

Reply via email to