For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are stuck
at this.
You try to scale vertically by increasing more CPU and memory for each task
manager.
If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <vict...@gmail.com> wrote:

> I had a similar problem.   I ended up solving by not relying on
> checkpoints for recovery and instead re-read my input sources (in my case a
> kafka topic) from the earliest offset and rebuilding only the state I
> need.  I only need to care about the past 1 to 2 days of state so can
> afford to drop anything older.   My recovery time went from over an hour
> for just the first checkpoint to under 10 minutes.
>
> Tim
>
> On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myas...@live.com> wrote:
>
>> Hi Jeff
>>
>>
>>    1. " after around 50GB of state, I stop being able to reliably take
>>    checkpoints or savepoints. "
>>    What is the exact reason that job cannot complete checkpoint? Expired
>>    before completing or decline by some tasks? The former one is manly caused
>>    by high back-pressure and the later one is mainly due to some internal
>>    error.
>>    2. Have you checked what reason the remote task manager is lost?
>>    If the remote task manager is not crashed, it might be due to GC
>>    impact, I think you might need to check task-manager logs and GC logs.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Jeff Henrikson <jehenri...@gmail.com>
>> *Sent:* Thursday, June 18, 2020 1:46
>> *To:* user <user@flink.apache.org>
>> *Subject:* Trouble with large state
>>
>> Hello Flink users,
>>
>> I have an application of around 10 enrichment joins.  All events are
>> read from kafka and have event timestamps.  The joins are built using
>> .cogroup, with a global window, triggering on every 1 event, plus a
>> custom evictor that drops records once a newer record for the same ID
>> has been processed.  Deletes are represented by empty events with
>> timestamp and ID (tombstones). That way, we can drop records when
>> business logic dictates, as opposed to when a maximum retention has been
>> attained.  The application runs RocksDBStateBackend, on Kubernetes on
>> AWS with local SSDs.
>>
>> Unit tests show that the joins produce expected results.  On an 8 node
>> cluster, watermark output progress seems to indicate I should be able to
>> bootstrap my state of around 500GB in around 1 day.  I am able to save
>> and restore savepoints for the first half an hour of run time.
>>
>> My current trouble is that after around 50GB of state, I stop being able
>> to reliably take checkpoints or savepoints.  Some time after that, I
>> start getting a variety of failures where the first suspicious log event
>> is a generic cluster connectivity error, such as:
>>
>>      1) java.io.IOException: Connecting the channel failed: Connecting
>>      to remote task manager + '/10.67.7.101:38955' has failed. This
>>      might indicate that the remote task manager has been lost.
>>
>>      2) org.apache.flink.runtime.io.network.netty.exception
>>      .RemoteTransportException: Connection unexpectedly closed by remote
>>      task manager 'null'. This might indicate that the remote task
>>      manager was lost.
>>
>>      3) Association with remote system
>>      [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>>      gated for [50] ms. Reason: [Association failed with
>>      [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>>      [java.net.NoRouteToHostException: No route to host]
>>
>> I don't see any obvious out of memory errors on the TaskManager UI.
>>
>> Adding nodes to the cluster does not seem to increase the maximum
>> savable state size.
>>
>> I could enable HA, but for the time being I have been leaving it out to
>> avoid the possibility of masking deterministic faults.
>>
>> Below are my configurations.
>>
>> Thanks in advance for any advice.
>>
>> Regards,
>>
>>
>> Jeff Henrikson
>>
>>
>>
>> Flink version: 1.10
>>
>> Configuration set via code:
>>      parallelism=8
>>      maxParallelism=64
>>      setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      setTolerableCheckpointFailureNumber(1000)
>>      setMaxConcurrentCheckpoints(1)
>>
>>
>> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>      RocksDBStateBackend
>>      setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      setNumberOfTransferThreads(25)
>>      setDbStoragePath points to a local nvme SSD
>>
>> Configuration in flink-conf.yaml:
>>
>>      jobmanager.rpc.address: localhost
>>      jobmanager.rpc.port: 6123
>>      jobmanager.heap.size: 28000m
>>      taskmanager.memory.process.size: 28000m
>>      taskmanager.memory.jvm-metaspace.size: 512m
>>      taskmanager.numberOfTaskSlots: 1
>>      parallelism.default: 1
>>      jobmanager.execution.failover-strategy: full
>>
>>      cluster.evenly-spread-out-slots: false
>>
>>      taskmanager.memory.network.fraction: 0.2           # default 0.1
>>      taskmanager.memory.framework.off-heap.size: 2GB
>>      taskmanager.memory.task.off-heap.size: 2GB
>>      taskmanager.network.memory.buffers-per-channel: 32 # default 2
>>      taskmanager.memory.managed.fraction: 0.4           # docs say
>> default 0.1, but something seems to set 0.4
>>      taskmanager.memory.task.off-heap.size: 2048MB      # default 128M
>>
>>      state.backend.fs.memory-threshold: 1048576
>>      state.backend.fs.write-buffer-size: 10240000
>>      state.backend.local-recovery: true
>>      state.backend.rocksdb.writebuffer.size: 64MB
>>      state.backend.rocksdb.writebuffer.count: 8
>>      state.backend.rocksdb.writebuffer.number-to-merge: 4
>>      state.backend.rocksdb.timer-service.factory: heap
>>      state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
>>      state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
>>
>>      web.checkpoints.history: 250
>>
>

Reply via email to