[
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414085#comment-16414085
]
Steven Zhen Wu edited comment on FLINK-9061 at 3/26/18 10:21 PM:
-----------------------------------------------------------------
[~StephanEwen] [~jgrier]
We run into S3 throttling issue for two kinds of scenarios.
* thousands of massively parallel/stateless routing jobs for Keystone data
pipeline. for that, we configure a *static* 4-char random hex for the
checkpoint path for each routing job during deployment so that S3 writes *from
many jobs* are spread different S3 partitions. that requires no change in
Flink, just some deployment tooling automation. Also for this case, we set
`state.backend.fs.memory-threshold` to disable S3 writes from task managers.
Only job manager writes one uber metadata file (with state embedded)
* single large-state job with TBs state written from thousands of operators.
To avoid throttling, we need S3 writes *from many operators* to spread to
different S3 partitions. that's where we want to inject *dynamic* 4-char random
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__
I can see that hash approach can work for both scenario. but the hash need to
be applied to the complete path (not just the prefix configured in
flink-conf.yaml)
But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise,
we can never take a savepoint for case #2 (single large-state job).
was (Author: stevenz3wu):
[~StephanEwen] [~jgrier]
We run into S3 throttling issue for two kinds of scenarios.
* thousands of massively parallel/stateless routing jobs for Keystone data
pipeline. for that, we configure a *static* 4-char random hex for the
checkpoint path for each routing job during deployment so that S3 writes *from
many jobs* are spread different S3 partitions. that requires no change in
Flink, just some deployment tooling automation.
* single large-state job with TBs state written from thousands of operators.
To avoid throttling, we need S3 writes *from many operators* to spread to
different S3 partitions. that's where we want to inject *dynamic* 4-char random
hex for each S3 write. that was my earlier proposal of __ENTROPY___KEY__
I can see that hash approach can work for both scenario. but the hash need to
be applied to the complete path (not just the prefix configured in
flink-conf.yaml)
But I do think we need the _hash_ / _entropy_ for savepoint as well. Otherwise,
we can never take a savepoint for case #2 (single large-state job).
> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -----------------------------------------------------------------------------
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
> Issue Type: Bug
> Components: FileSystem, State Backends, Checkpointing
> Affects Versions: 1.4.2
> Reporter: Jamie Grier
> Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale
> jobs (those with many total tasks). The issue is that we are writing all the
> checkpoint data under a common key prefix. This is the worst case scenario
> for S3 performance since the key is used as a partition key.
>
> In the worst case checkpoints fail with a 500 status code coming back from S3
> and an internal error type of TooBusyException.
>
> One possible solution would be to add a hook in the Flink filesystem code
> that allows me to "rewrite" paths. For example say I have the checkpoint
> directory set to:
>
> s3://bucket/flink/checkpoints
>
> I would hook that and rewrite that path to:
>
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original
> path
>
> This would distribute the checkpoint write load around the S3 cluster evenly.
>
> For reference:
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>
> Any other people hit this issue? Any other ideas for solutions? This is a
> pretty serious problem for people trying to checkpoint to S3.
>
> -Jamie
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)