Hi Stefan , My bad , I'm really sorry. I have copied wrong exception stack , during the recovery after error I'm seeing below exception
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): Cannot truncate to a larger file size. Current size: 31132385, truncate size: 35985787. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy12.truncate(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy13.truncate(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696) ... 15 more 2017-08-24 20:22:44,005 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (9/16) (af635c8938168acfc85542c830d71002) switched from RUNNING to FAILED. java.lang.RuntimeException: Could not invoke truncate. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:846) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:718) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:177) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:159) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:105) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor308.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:808) ... 11 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): Cannot truncate to a larger file size. Current size: 38029300, truncate size: 44601803. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy12.truncate(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy13.truncate(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696) ... 15 more On Thu, Aug 24, 2017 at 4:25 AM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > I think there are two different things mixed up in your analysis. The > stack trace that you provided is caused by a failing checkpoint - in > writing, not in reading. It seems to fail from a Timeout of your HDFS > connection. This close method has also nothing to do with the close method > in the writer. It is the close method of the CheckpointOutputStream. > Furthermore, „could not materialize checkpoint“ seems to happen in cancel, > so if the checkpoint got canceled that means this is an effect and not the > cause. There should be another exception further up in the logs that gives > the real reason why the checkpoint was canceled. > > Nevertheless, the timeout is strange and you should check if your DFS is > properly configured and running as expected. The reported exception should > have no direct connection with your ParquetWriter. It is possible that the > checkpoint was canceled because some problem happened in the ParquetWriter, > but then we are looking at the wrong stack trace. > > As for the pending files, different DFS implementations could have > different points where flush() is called. I think your implementation also > properly forward to writer.flush? > > Best, > Stefan > > > Am 23.08.2017 um 21:05 schrieb Biswajit Das <biswajit...@gmail.com>: > > > > Hi There , > > > > I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two > issue > > > > first one if write the same file on s3 all the files > > gets committed , however when I write the same on HDFS I see its remains > on .pending state , could be related to second problem below > > > > Second issue : My custom writer is writing Avro to parquet and writer is > something like this extended from BaseStreamWriter > > > > > > @transient private var writer: ParquetWriter[T] = _ > > > > override def open(fs: FileSystem, path: Path): Unit = { > > val conf = new Configuration() > > conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false) > > conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false) > > writer = AvroParquetWriter > > .builder[T](path) > > .withSchema(new Schema.Parser().parse(schema)) > > .withCompressionCodec(compressionCodecName) > > .withConf(conf) > > .build() > > } > > > > override def write(element: T): Unit = writer.write(element) > > > > override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T]( > schema) > > > > override def close(): Unit = writer.close() > > > > override def getPos: Long = writer.getDataSize > > > > override def flush(): Long = super.flush() > > > > > > What I noticed during recovering from checkpoint it fails to flush , > although I have overriden flush ^^ above . The issue seems > > it doesn't have handle of stream writer that's why it is failing when > flush call for stream writer , not sure if first .pedning > > state is related to this also . > > > > > > -------------------------------------------------- > > 11:52:04.082 [pool-13-thread-1] INFO o.a.flink.runtime.taskmanager.Task > - Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6) > switched from RUNNING to FAILED. > > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 4 for operator > Source:- kafka source (1/1). > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:970) > ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_73] > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_73] > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_73] > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_73] > > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73] > > Caused by: java.lang.Exception: Could not materialize checkpoint 4 for > operator Source: eo_open- kafka source (1/1). > > ... 6 common frames omitted > > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output > stream to hdfs://XXXX:8020/checkpoint/data/das/ > 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c > in order to obtain the stream state handle > > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > [na:1.8.0_73] > > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > [na:1.8.0_73] > > at > > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-core-1.3.2.jar:1.3.2] > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:906) > ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] > > ... 5 common frames omitted > > Suppressed: java.lang.Exception: Could not properly cancel managed > operator state future. > > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) > ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:961) > ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] > > ... 5 common frames omitted > > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output > stream to hdfs://xxx:8020/checkpoint/data/das/ > 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c > in order to obtain the stream state handle > > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:43) > > at org.apache.flink.runtime.state.StateUtil. > discardStateFuture(StateUtil.java:85) > > at org.apache.flink.streaming.api.operators. > OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96) > > ... 7 common frames omitted > > Caused by: java.io.IOException: Could not flush and close the file > system output stream to hdfs://XXX:8020/checkpoint/data/das/ > 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c > in order to obtain the stream state handle > > at org.apache.flink.runtime.state.filesystem. > FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle( > FsCheckpointStreamFactory.java:336) > > at org.apache.flink.runtime.checkpoint. > AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle( > AbstractAsyncSnapshotIOCallable.java:100) > > at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1. > performOperation(DefaultOperatorStateBackend.java:270) > > at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1. > performOperation(DefaultOperatorStateBackend.java:233) > > at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable. > call(AbstractAsyncIOCallable.java:72) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at org.apache.flink.runtime.state.DefaultOperatorStateBackend. > snapshot(DefaultOperatorStateBackend.java:288) > > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392) > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) > > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > checkpointState(StreamTask.java:654) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > performCheckpoint(StreamTask.java:590) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > triggerCheckpoint(StreamTask.java:521) > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask. > triggerCheckpoint(SourceStreamTask.java:112) > > at org.apache.flink.runtime.taskmanager.Task$3.run(Task. > java:1185) > > ... 5 common frames omitted > > Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 > millis timeout while waiting for channel to be ready for connect. ch : > java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010] > > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) > > at org.apache.hadoop.hdfs.DFSOutputStream. > createSocketForPipeline(DFSOutputStream.java:1341) > > > > > > ------------------------------------- > >