Vijay,
Thanks for your thoughts. Below are answers to your questions.
> 1. What's your checkpoint interval?
I have used many different checkpoint intervals, ranging from 5 minutes
to never. I usually setMinPasueBetweenCheckpoints to the same value as
the checkpoint interval.
> 2. How frequently are you updating the state into RocksDB?
My understanding is that for .cogroup:
- Triggers control communication outside the operator
- Evictors control cleanup of internal state
- Configurations like write buffer size control the frequency of
state change at the storage layer
- There is no control for how frequently the window state updates at
the layer of the RocksDB api layer.
Thus, the state update whenever data is ingested.
> 3. How many task managers are you using?
Usually I have been running with one slot per taskmanager. 28GB of
usable ram on each node.
> 4. How much data each task manager handles while taking the checkpoint?
Funny you should ask. I would be okay with zero.
The application I am replacing has a latency of 36-48 hours, so if I had
to fully stop processing to take every snapshot synchronously, it might
be seen as totally acceptable, especially for initial bootstrap. Also,
the velocity of running this backfill is approximately 115x real time on
8 nodes, so the steady-state run may not exhibit the failure mode in
question at all.
It has come as some frustration to me that, in the case of
RocksDBStateBackend, the configuration key state.backend.async
effectively has no meaningful way to be false.
The only way I have found in the existing code to get a behavior like
synchronous snapshot is to POST to /jobs/<jobID>/stop with drain=false
and a URL. This method of failing fast is the way that I discovered
that I needed to increase transfer threads from the default.
The reason I don't just run the whole backfill and then take one
snapshot is that even in the absence of checkpoints, a very similar
congestion seems to take the cluster down when I am say 20-30% of the
way through my backfill.
Reloading from my largest feasible snapshot makes it possible to make
another snapshot a bit larger before crash, but not by much.
On first glance, the code change to allow RocksDBStateBackend into a
synchronous snapshots mode looks pretty easy. Nevertheless, I was
hoping to do the initial launch of my application without needing to
modify the framework.
Regards,
Jeff Henrikson
On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?
For points (3) and (4) , you should be very careful. I feel you are
stuck at this.
You try to scale vertically by increasing more CPU and memory for each
task manager.
If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.
Regards
Bhaskar
On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <vict...@gmail.com
<mailto:vict...@gmail.com>> wrote:
I had a similar problem. I ended up solving by not relying on
checkpoints for recovery and instead re-read my input sources (in my
case a kafka topic) from the earliest offset and rebuilding only the
state I need. I only need to care about the past 1 to 2 days of
state so can afford to drop anything older. My recovery time went
from over an hour for just the first checkpoint to under 10 minutes.
Tim
On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myas...@live.com
<mailto:myas...@live.com>> wrote:
Hi Jeff
1. "after around 50GB of state, I stop being able to reliably
take checkpoints or savepoints. "
What is the exact reason that job cannot complete
checkpoint? Expired before completing or decline by some
tasks? The former one is manly caused by high back-pressure
and the later one is mainly due to some internal error.
2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due
to GC impact, I think you might need to check task-manager
logs and GC logs.
Best
Yun Tang
------------------------------------------------------------------------
*From:* Jeff Henrikson <jehenri...@gmail.com
<mailto:jehenri...@gmail.com>>
*Sent:* Thursday, June 18, 2020 1:46
*To:* user <user@flink.apache.org <mailto:user@flink.apache.org>>
*Subject:* Trouble with large state
Hello Flink users,
I have an application of around 10 enrichment joins. All events
are
read from kafka and have event timestamps. The joins are built
using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the
same ID
has been processed. Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention
has been
attained. The application runs RocksDBStateBackend, on
Kubernetes on
AWS with local SSDs.
Unit tests show that the joins produce expected results. On an
8 node
cluster, watermark output progress seems to indicate I should be
able to
bootstrap my state of around 500GB in around 1 day. I am able
to save
and restore savepoints for the first half an hour of run time.
My current trouble is that after around 50GB of state, I stop
being able
to reliably take checkpoints or savepoints. Some time after
that, I
start getting a variety of failures where the first suspicious
log event
is a generic cluster connectivity error, such as:
1) java.io.IOException: Connecting the channel failed:
Connecting
to remote task manager + '/10.67.7.101:38955
<http://10.67.7.101:38955>' has failed. This
might indicate that the remote task manager has been lost.
2) org.apache.flink.runtime.io.network.netty.exception
.RemoteTransportException: Connection unexpectedly closed
by remote
task manager 'null'. This might indicate that the remote task
manager was lost.
3) Association with remote system
[akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>] has failed, address is now
gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>]] Caused by:
[java.net.NoRouteToHostException: No route to host]
I don't see any obvious out of memory errors on the TaskManager UI.
Adding nodes to the cluster does not seem to increase the maximum
savable state size.
I could enable HA, but for the time being I have been leaving it
out to
avoid the possibility of masking deterministic faults.
Below are my configurations.
Thanks in advance for any advice.
Regards,
Jeff Henrikson
Flink version: 1.10
Configuration set via code:
parallelism=8
maxParallelism=64
setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
setTolerableCheckpointFailureNumber(1000)
setMaxConcurrentCheckpoints(1)
enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RocksDBStateBackend
setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
setNumberOfTransferThreads(25)
setDbStoragePath points to a local nvme SSD
Configuration in flink-conf.yaml:
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 28000m
taskmanager.memory.process.size: 28000m
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: full
cluster.evenly-spread-out-slots: false
taskmanager.memory.network.fraction: 0.2 #
default 0.1
taskmanager.memory.framework.off-heap.size: 2GB
taskmanager.memory.task.off-heap.size: 2GB
taskmanager.network.memory.buffers-per-channel: 32 # default 2
taskmanager.memory.managed.fraction: 0.4 # docs say
default 0.1, but something seems to set 0.4
taskmanager.memory.task.off-heap.size: 2048MB #
default 128M
state.backend.fs.memory-threshold: 1048576
state.backend.fs.write-buffer-size: 10240000
state.backend.local-recovery: true
state.backend.rocksdb.writebuffer.size: 64MB
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.number-to-merge: 4
state.backend.rocksdb.timer-service.factory: heap
state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
state.backend.rocksdb.write-batch-size: 16000000 # default 2MB
web.checkpoints.history: 250