HI David,

Thanks for your response.

What's the DefaultScheduler you're referring to? Is that available in Flink
1.13.1 (the version we are using)?

How large is the state you're restoring from / how many TMs does the job
> consume / what is the parallelism?


Our checkpoint is about 900GB, and we have 256 TaskManagers with a
parallelism of 512.

Also things could get even worse if the parallelism that has been used for
> taking the checkpoint is different from the one you're trying to restore
> with (especially with RocksDB).
>

Can you elaborate on this a bit? We aren't changing the parallelism when
restoring.

On Thu, Dec 2, 2021 at 10:48 AM David Morávek <d...@apache.org> wrote:

> Hi Kevin,
>
> this happens only when the pipeline is started up from savepoint /
> retained checkpoint right? Guessing from the "path" you've shared it seems
> like a RockDB based retained checkpoint. In this case all task managers
> need to pull state files from the object storage in order to restore. This
> can indeed be a heavy operation especially when restore a large state with
> high parallelism.
>
> Recovery from failure should be faster (with DefaultScheduler) as we can
> re-use the local files that are already present on TaskManagers.
>
> How large is the state you're restoring from / how many TMs does the job
> consume / what is the parallelism?
>
> Also things could get even worse if the parallelism that has been used for
> taking the checkpoint is different from the one you're trying to restore
> with (especially with RocksDB).
>
> Best,
> D.
>
> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Hi all,
>>
>> We're running a large (256 task managers with 4 task slots each) Flink
>> Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>> Flink application writes out to GCS from one of its sinks via streaming
>> file sink + GCS connector.
>>
>> We observed the following types of errors when running our application:
>>
>> ```
>>
>> INFO: Encountered status code 429 when sending GET request to URL '
>> https://storage.googleapis.com/download/storage/v1/b/<redacted>/o/<redacted>checkpoints%2F00000000000000000000000000000000%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
>> Delegating to response handler for possible retry. [CONTEXT
>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>
>> ```
>>
>> ```
>>  INFO: Encountered status code 503 when sending POST request to URL '
>> https://storage.googleapis.com/upload/storage/v1/b/<redacted>/o?uploadType=multipart'.
>> Delegating to response handler for possible retry.
>> ```
>>
>> They typically happen upon cluster start-up, when all the task managers
>> are registering with the jobmanager. We've also seen them occur as a result
>> of output from our sink operator as well.
>>
>> Has anyone else encountered similar issues? Any practices you can
>> suggest?
>>
>> Advice appreciated!
>>
>> Thanks
>>
>

Reply via email to