[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414085#comment-16414085 ]
Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 11:49 PM: ----------------------------------------------------------------- [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash/random prefix solution can work for both scenarios. but the hash need to be applied to the complete path (including the last random UUID part). But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. was (Author: stevenz3wu): [~StephanEwen] [~jgrier] We run into S3 throttling issue for two kinds of scenarios. * thousands of massively parallel/stateless routing jobs for Keystone data pipeline. for that, we configure a *static* 4-char random hex for the checkpoint path for each routing job during deployment so that S3 writes *from many jobs* are spread different S3 partitions. that requires no change in Flink, just some deployment tooling automation. Also for this case, we set `state.backend.fs.memory-threshold` to disable S3 writes from task managers. Only job manager writes one uber metadata file (with state embedded) * single large-state job with TBs state written from thousands of operators. To avoid throttling, we need S3 writes *from many operators* to spread to different S3 partitions. that's where we want to inject *dynamic* 4-char random hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__. Basically, when each operator creates the FsCheckpointStreamFactory object with_ _ENTROPY___KEY__ substituted with a 4-char random hex. I can see that hash approach can work for both scenario. but the hash need to be applied to the complete path (not just the prefix configured in flink-conf.yaml). But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise, we are likely to get throttled when taking a savepoint for single large-state job. > S3 checkpoint data not partitioned well -- causes errors and poor performance > ----------------------------------------------------------------------------- > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing > Affects Versions: 1.4.2 > Reporter: Jamie Grier > Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)