Hi Gabor,

Thanks for the suggestions.

For the rocksDB config tweaks would you be able to make some concrete
suggestions? I have looked into the official Tuning Checkpoints and Large
State guide documentation, however tuning rocksDB does not seem
straightforward tbh.

My entire flinkConfiguration looks like this:

spec:
  flinkConfiguration:
    env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
      --add-opens java.base/java.time=ALL-UNNAMED --add-opens
java.base/java.nio=ALL-UNNAMED
    execution.checkpointing.file-merging.enabled: "true"
    execution.checkpointing.interval: "120000"
    execution.checkpointing.min-pause: "60000"
    execution.checkpointing.timeout: "900000"
    execution.checkpointing.unaligned: "true"
    fs.default-scheme: abfss://<container-name>@<storage-account-name>.
dfs.core.windows.net
    high-availability.storageDir:
abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/test/ha
    high-availability.type: kubernetes
    job.autoscaler.enabled: "true"
    job.autoscaler.memory.tuning.enabled: "true"
    job.autoscaler.metrics.window: 10m
    job.autoscaler.stabilization.interval: 5m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    pipeline.max-parallelism: "4"
    rest.profiling.enabled: "true"
    state.backend: rocksdb
    state.backend.incremental: "true"
    state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
    state.backend.rocksdb.compaction.style: UNIVERSAL
    state.backend.rocksdb.localdir: /mnt/flink
    state.backend.rocksdb.thread.num: "8"
    state.backend.rocksdb.writebuffer.size: 128MB
    state.checkpoints.dir: abfss://<container-name>@<storage-account-name>.
dfs.core.windows.net/test/checkpointing
    state.savepoints.dir: abfss://<container-name>@<storage-account-name>.
dfs.core.windows.net/test/savepoints
    taskmanager.network.memory.buffer-debloat.enabled: "true"
    taskmanager.numberOfTaskSlots: "1"

Best regards,
Bjarke


On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Please see my comments inline.
>
> G
>
>
> On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com>
> wrote:
>
>> Hi Gabor,
>>
>> What kind of development work - is it the Disaggregated State Storage and
>> Management FLIP (
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855)
>> that you are referring to?
>>
> Yes.
>
>>
>> Thanks for the suggestion. I am not familiar with the state processor api
>> but I will have a look.
>>
>> My state is not even that large(10-50 Gb). It seems like people are using
>> RocksDB with Tb of state but I am a bit puzzled as to how they handle
>> restarts from savepoint when it takes so long. Do you know?
>>
> Yeah, we know users with Tb of states and not easy to handle such jobs
> because of the long restore times.
> Practically the cluster must be strong enough to catch up the accumulated
> events after the long restore time + some config tweaks which I've listed
> in the next bullet point.
>
>>
>> Are there some flink configurations that I might be able to try out to
>> alleviate the slow restart that might be causing the exception?
>>
> What helps is "state.backend.local-recovery=true", some rocksDB tuning and
> reducing state size if possible (better eviction, etc...).
>
>>
>> Best regards,
>> Bjarke
>>
>> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Bjarke,
>>>
>>> It's normal to see longer recovery time as the state size grows. There
>>> are developments in progress to mitigate this problem.
>>>
>>> > Any ideas as to what could be causing this?
>>> I think there is no easy way to tell it, I can give just some pointers.
>>>
>>> First I would take a look at the state files with the State Processor
>>> API[1], I would be really surprised though since ADLS is consistent,
>>> only the rename with overwrite operation is not atomic what we don't do
>>> in Flink.
>>>
>>> When the checkpoint files are not corrupt then I would add some debug
>>> printouts to the exception catch area and try to find out the
>>> exact root cause (since several Gb state can fit into memory maybe local
>>> repro can be possible with debugger).
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When
>>>> my job restarts to adjust parallelism, and restores from savepoint, I quite
>>>> often get the following error:
>>>>
>>>> 2024-08-07 19:43:09
>>>> java.lang.Exception: Exception while creating
>>>> StreamOperatorStateContext.
>>>> at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>> StreamTaskStateInitializerImpl.java:330)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>> .initializeState(AbstractStreamOperator.java:275)
>>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .restoreStateAndGates(StreamTask.java:858)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .lambda$restoreInternal$5(StreamTask.java:812)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>>>> .call(StreamTaskActionExecutor.java:55)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
>>>> StreamTask.java:812)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
>>>> StreamTask.java:771)
>>>> at org.apache.flink.runtime.taskmanager.Task
>>>> .runWithSystemExitMonitoring(Task.java:970)
>>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
>>>> .java:939)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> keyed state backend for
>>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any
>>>> of the 1 provided restore options.
>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>> .createAndRestore(BackendRestorerProcedure.java:165)
>>>> at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>>>> StreamTaskStateInitializerImpl.java:457)
>>>> at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>> StreamTaskStateInitializerImpl.java:203)
>>>> ... 12 more
>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Caught unexpected exception.
>>>> at org.apache.flink.contrib.streaming.state.
>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>> .java:459)
>>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
>>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533)
>>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
>>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96)
>>>> at org.apache.flink.streaming.api.operators.
>>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>>>> StreamTaskStateInitializerImpl.java:446)
>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>> .createAndRestore(BackendRestorerProcedure.java:137)
>>>> ... 14 more
>>>> Caused by: java.io.EOFException
>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>> at org.apache.flink.api.common.typeutils.base.array.
>>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer
>>>> .java:82)
>>>> at org.apache.flink.runtime.state.restore.
>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>> FullSnapshotRestoreOperation.java:298)
>>>> at org.apache.flink.runtime.state.restore.
>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>> FullSnapshotRestoreOperation.java:273)
>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>> RocksDBFullRestoreOperation.restoreKVStateData(
>>>> RocksDBFullRestoreOperation.java:148)
>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>> RocksDBFullRestoreOperation.applyRestoreResult(
>>>> RocksDBFullRestoreOperation.java:128)
>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:
>>>> 102)
>>>> at org.apache.flink.contrib.streaming.state.
>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>> .java:376)
>>>> ... 19 more
>>>>
>>>> However, most often the job is able to start up after several retries.
>>>> It seems to happen more often with higher parallelism (and higher cpu) and
>>>> as such with more pods.
>>>> Additionally, as the state grows into the Gb size it takes longer for
>>>> the job to start up and as such my job has longer downtime due to the many
>>>> retries before it successfully restores state.
>>>>
>>>> Any ideas as to what could be causing this?
>>>>
>>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official
>>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage.
>>>>
>>>> Best regards,
>>>> Bjarke
>>>>
>>>

Reply via email to