[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414828#comment-16414828 ]
Jamie Grier commented on FLINK-9061: ------------------------------------ [~ste...@apache.org] Here's the full stack trace: "java.lang.Exception: Could not perform checkpoint 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569) org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 465 for operator catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256). org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089) org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038) org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671) org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607) org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560) ... 8 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b in order to obtain the stream state handle org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105) org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126) org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113) org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359) ... 13 common frames omitted Caused by: java.io.IOException: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended Request ID: BLAH_ org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036) org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319) ... 18 common frames omitted Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: BLAH) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647) org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) > S3 checkpoint data not partitioned well -- causes errors and poor performance > ----------------------------------------------------------------------------- > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing > Affects Versions: 1.4.2 > Reporter: Jamie Grier > Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)