Hi Kevin,

Flink comes with two schedulers for streaming:
- Default
- Adaptive (opt-in)

Adaptive is still in experimental phase and doesn't support local recover.
You're most likely using the first one, so you should be OK.

Can you elaborate on this a bit? We aren't changing the parallelism when
> restoring.
>

Splitting / merging of the rocksdb based operator checkpoint is currently
an expensive operation. If the parallelism remains unchanged, you should be
OK, the majority of time for the operator state restore will be spend on
download of the rocksdb snapshot.

Our checkpoint is about 900GB, and we have 256 TaskManagers with a
> parallelism of 512.
>

This could definitely generate lot of concurrent requests when restoring
the state.

Does the restore operation fail, or the retry mechanism is sufficient to
work around this?

D.

On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam <kevin....@shopify.com> wrote:

> HI David,
>
> Thanks for your response.
>
> What's the DefaultScheduler you're referring to? Is that available in
> Flink 1.13.1 (the version we are using)?
>
> How large is the state you're restoring from / how many TMs does the job
>> consume / what is the parallelism?
>
>
> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
> parallelism of 512.
>
> Also things could get even worse if the parallelism that has been used for
>> taking the checkpoint is different from the one you're trying to restore
>> with (especially with RocksDB).
>>
>
> Can you elaborate on this a bit? We aren't changing the parallelism when
> restoring.
>
> On Thu, Dec 2, 2021 at 10:48 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> this happens only when the pipeline is started up from savepoint /
>> retained checkpoint right? Guessing from the "path" you've shared it seems
>> like a RockDB based retained checkpoint. In this case all task managers
>> need to pull state files from the object storage in order to restore. This
>> can indeed be a heavy operation especially when restore a large state with
>> high parallelism.
>>
>> Recovery from failure should be faster (with DefaultScheduler) as we can
>> re-use the local files that are already present on TaskManagers.
>>
>> How large is the state you're restoring from / how many TMs does the job
>> consume / what is the parallelism?
>>
>> Also things could get even worse if the parallelism that has been used
>> for taking the checkpoint is different from the one you're trying to
>> restore with (especially with RocksDB).
>>
>> Best,
>> D.
>>
>> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote:
>>
>>> Hi all,
>>>
>>> We're running a large (256 task managers with 4 task slots each) Flink
>>> Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
>>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>>> Flink application writes out to GCS from one of its sinks via streaming
>>> file sink + GCS connector.
>>>
>>> We observed the following types of errors when running our application:
>>>
>>> ```
>>>
>>> INFO: Encountered status code 429 when sending GET request to URL '
>>> https://storage.googleapis.com/download/storage/v1/b/<redacted>/o/<redacted>checkpoints%2F00000000000000000000000000000000%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
>>> Delegating to response handler for possible retry. [CONTEXT
>>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>>
>>> ```
>>>
>>> ```
>>>  INFO: Encountered status code 503 when sending POST request to URL '
>>> https://storage.googleapis.com/upload/storage/v1/b/<redacted>/o?uploadType=multipart'.
>>> Delegating to response handler for possible retry.
>>> ```
>>>
>>> They typically happen upon cluster start-up, when all the task managers
>>> are registering with the jobmanager. We've also seen them occur as a result
>>> of output from our sink operator as well.
>>>
>>> Has anyone else encountered similar issues? Any practices you can
>>> suggest?
>>>
>>> Advice appreciated!
>>>
>>> Thanks
>>>
>>

Reply via email to