Hi Averell, Did you have other failures before (from which you managed to resume successfully)? Can you share a bit more details about your job and potentially the TM/JM logs?
The only thing I found about this is here https://forums.aws.amazon.com/thread.jspa?threadID=130172 but Flink does not directly call getObject(). We rely on Hadoop code to read objects. In addition, we do not interact directly with the connection pool. Cheers, Kostas On Mon, Mar 4, 2019 at 1:53 PM Averell <lvhu...@gmail.com> wrote: > Hello everyone, > > I have a job which is writing some streams into parquet files in S3. I use > Flink 1.7.2 on EMR 5.21. > My job had been running well, but suddenly it failed to make a checkpoint > with the full stack trace mentioned below. After that failure, the job > restarted from the last successful checkpoint, but it could not make any > further checkpoint - all subsequent checkpoints failed with the same > reason. > Searching on Internet I could only find one explanation: S3Object has not > been closed properly. > > Could someone please help? > > Thanks and regards, > Averell > > > /The program finished with the following exception: > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > 8cf68432acb3b4ced2cbc97bc23b4af5 failed. > at > > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723) > at > > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701) > at > > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint > failed: Could not perform checkpoint 33 for operator Sink: S3 - Instant > (1/4). > at > > org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972) > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: java.lang.Exception: > Checkpoint failed: Could not perform checkpoint 33 for operator Sink: S3 - > Instant (1/4). > at > > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortWithCause(PendingCheckpoint.java:452) > at > > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:447) > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1258) > at > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failUnacknowledgedPendingCheckpointsFor(CheckpointCoordinator.java:918) > at > > org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyExecutionChange(ExecutionGraph.java:1779) > at > > org.apache.flink.runtime.executiongraph.ExecutionVertex.notifyStateTransition(ExecutionVertex.java:756) > at > > org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1353) > at > > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1113) > at > > org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:945) > at > > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1576) > at > > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:542) > at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > ... 15 more > Caused by: java.lang.Exception: Checkpoint failed: Could not perform > checkpoint 33 for operator Sink: S3 - Instant (1/4). > ... 30 more > Caused by: java.lang.Exception: Could not perform checkpoint 33 for > operator > Sink: S3 - Instant (1/4). > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595) > at > > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396) > at > > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292) > at > > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200) > at > org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:209) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not complete snapshot 33 for operator > Sink: S3 - Instant (1/4). > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586) > ... 8 more > Caused by: java.io.IOException: Uploading parts failed > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56) > at > > org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235) > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395) > ... 13 more > Caused by: java.io.InterruptedIOException: upload part on > output/instant/dt=2019-01-09/part-0-24: > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable > to execute HTTP request: Timeout waiting for connection from pool > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) > at > > org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73) > at > > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318) > at > > org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable > to execute HTTP request: Timeout waiting for connection from pool > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) > at > > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > ... 12 more > Caused by: > > org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: > Timeout waiting for connection from pool > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269) > at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy29.get(Unknown > Source) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) > at > > org.apache.flink.fs.s3base.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236) > at > > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) > ... 25 more > / > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >