Bhaskar,
> Glad to know some progress.
Yeah, some progress. Yet overnight run didn't look as good as I hoped.
The throttling required to not crash during snapshots seems to be quite
different from the throttling required to crash not during snapshots.
So the lowest common denominator is quite a large performance penalty.
What's worse, the rate of input that makes the snapshot performance go
from good to bad seems to change significantly as the state size grows.
Here is checkpoint history from an overnight run.
Parameters:
- 30 minutes minimum between snapshots
- incremental snapshot mode
- inputs throttled to 100 events per sec per input per slot,
which is around 1/4 of the unthrottled throughput
Checkpoint history:
ID Status Acknowledged Trigger Time Latest Acknowledgement End to End
Duration State Size Buffered During Alignment
12 COMPLETED 304/304 8:52:22 10:37:18 1h 44m 55s
60.5 GB 0 B
11 COMPLETED 304/304 6:47:03 8:22:19 1h 35m 16s 53.3 GB
0 B
10 COMPLETED 304/304 5:01:20 6:17:00 1h 15m 39s 41.0 GB
0 B
9 COMPLETED 304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
8 COMPLETED 304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
7 COMPLETED 304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
6 COMPLETED 304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
5 COMPLETED 304/304 0:23:27 0:28:01 4m 33s 14.3 GB 0 B
4 COMPLETED 304/304 23:52:29 23:53:26 56s
12.7 GB 0 B
3 COMPLETED 304/304 23:20:59 23:22:28 1m 29s
10.8 GB 0 B
2 COMPLETED 304/304 22:46:17 22:50:58 4m 40s
7.40 GB 0 B
As you can see, GB/minute varies drastically. GB/minute also varies
drastically with full checkpoint mode.
I'm pleased that it hasn't crashed yet. Yet I'm concerned that with the
checkpoint GB/minute getting so slow, it will crash soon.
I'm really wishing state.backend.async=false worked for RocksDbStateBackend.
I'm also wondering if my throttler would improve if I just connected to
the REST api to ask if any checkpoint is in progress, and then paused
inputs accordingly. Effectively state.backend.async=false via hacked
application code.
> Where are you updating your state here? I
> couldn't find any flink managed state here.
The only updates to state I make are through the built-in
DataStream.cogroup. A unit test (without RocksDB loaded) of the way I
use .cogroup shows exactly two ways that .cogroup calls an
implementation of AppendingState.add. I summarize those below.
The two AppendingState subclasses invoked are HeapListState and
HeapReducingState. Neither have a support attribute on them, such as
MapState's @PublicEvolving.
> I suggested updating the flink managed state using onTimer over an
> interval equal to the checkpoint interval.
So the onTimer method, with interval set to the checkpoint interval.
Interesting.
It looks like the closest subclass for my use case use would be either
KeyedCoProcessFunction. Let me see if I understand concretely the idea:
1) between checkpoints, read join input and write join output, by
loading any state reads from external state, but buffering all state
changes in memory in some kind of data structure.
2) whenever a checkpoint arrived or the memory consumed by buffered
writes gets too big, flush the writes to state.
Is that the gist of the idea about .onTimer?
Jeff
There are two paths from .coGroup to AppendingState.add
path 1 of 2: .coGroup to HeapListState
add:90, HeapListState {org.apache.flink.runtime.state.heap}
processElement:203, EvictingWindowOperator
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:164, StreamOneInputProcessor
{org.apache.flink.streaming.runtime.io}
processInput:143, StreamOneInputProcessor
{org.apache.flink.streaming.runtime.io}
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement
(windowAssigner is an instance of GlobalWindows)
@Override
public void processElement(StreamRecord<IN> element)
throws Exception {
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(),
windowAssignerContext);
//if element is handled by none of assigned
elementWindows
boolean isSkippedElement = true;
final K key =
this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
. . .
} else {
for (W window : elementWindows) {
// check if the window is already
inactive
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
=>
org.apache.flink.runtime.state.heap.HeapListState#add:
@Override
public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add
null to a ListState.");
final N namespace = currentNamespace;
final StateTable<K, N, List<V>> map =
stateTable;
List<V> list = map.get(namespace);
if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);
}
path 2 of 2: .coGroup to HeapReducingState
add:95, HeapReducingState {org.apache.flink.runtime.state.heap}
onElement:49, CountTrigger
{org.apache.flink.streaming.api.windowing.triggers}
onElement:898, WindowOperator$Context
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:210, EvictingWindowOperator
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:164, StreamOneInputProcessor
{org.apache.flink.streaming.runtime.io}
processInput:143, StreamOneInputProcessor
{org.apache.flink.streaming.runtime.io}
@Override
public void processElement(StreamRecord<IN> element) throws
Exception {
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(),
windowAssignerContext);
//if element is handled by none of assigned
elementWindows
boolean isSkippedElement = true;
final K key =
this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
. . .
} else {
for (W window : elementWindows) {
// check if the window is already
inactive
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
triggerContext.key = key;
triggerContext.window = window;
evictorContext.key = key;
evictorContext.window = window;
TriggerResult triggerResult =
triggerContext.onElement(element);
=>
public TriggerResult onElement(StreamRecord<IN> element)
throws Exception {
return trigger.onElement(element.getValue(),
element.getTimestamp(), window, this);
=>
@Override
public TriggerResult onElement(Object element, long timestamp,
W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count =
ctx.getPartitionedState(stateDesc);
count.add(1L);
=>
org.apache.flink.runtime.state.heap.HeapReducingState#add
@Override
public void add(V value) throws IOException {
if (value == null) {
On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
Glad to know some progress. Where are you updating your state here? I
couldn't find any flink managed state here.
I suggested updating the flink managed state using onTimer over an
interval equal to the checkpoint interval.
In your case since you do throttling, it helped to maintain the fixed
rate per slot. Before the rate was sporadic.
It's definitely an IO bottleneck.
So now you can think of decoupling stateless scanning and stateful joins.
For example you can keep a stateless scan as separate flink job and keep
its output in some Kafka kind of store.
From there you start your stateful joins. This would help focussing on
your stateful job in much better fashion
Regards
Bhaskar
On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenri...@gmail.com
<mailto:jehenri...@gmail.com>> wrote:
Bhaskar,
Based on your idea of limiting input to get better checkpoint behavior,
I made a ProcessFunction that constraints to a number of events per
second per slot per input. I do need to do some stateless input
scanning before joins. The stateless part needs to be fast and does no
impact snapshots. So I inserted the throttling after the input
preprocessing but before the stateful transformations. There is a
significant difference of snapshot throughput (often 5x or larger) when
I change the throttle between 200 and 300 events per second (per slot
per input).
Hope the throttling keeps being effective as I keep the job running
longer.
Odd. But likely a very effective way out of my problem.
I wonder what drives it . . . Thread contention? IOPS contention?
See ProcessFunction code below.
Many thanks!
Jeff
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
// Set eventsPerSecMax to -1 to disable the throttle
// TODO: Actual number of events can be slightly larger
// TODO: Remove pause correlation with system clock
case class Throttler[T](eventsPerSecMax : Double) extends
ProcessFunction[T,T] {
var minutePrev = 0
var numEvents = 0
def minutes() = {
val ms = System.currentTimeMillis()
(ms / 1000 / 60).toInt
}
def increment() = {
val m = minutes()
if(m != minutePrev) {
numEvents = 0
}
numEvents += 1
}
def eps() = {
numEvents/60.0
}
override def processElement(x: T, ctx: ProcessFunction[T,
T]#Context,
out: Collector[T]): Unit = {
increment()
if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
Thread.sleep(1000L)
}
out.collect(x)
}
}
On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> Bhaskar,
>
> Thank you for your thoughtful points.
>
> > I want to discuss more on points (1) and (2)
> > If we take care of them rest will be good
> >
> > Coming to (1)
> >
> > Please try to give reasonable checkpoint interval time for
every job.
> > Minum checkpoint interval recommended by flink community is 3
minutes
> > I thin you should give minimum 3 minutes checkpoint interval
for all
>
> I have spent very little time testing with checkpoint intervals
of under
> 3 minutes. I frequently test with intervals of 5 minutes and of 30
> minutes. I also test with checkpoint intervals such as 60
minutes, and
> never (manual only). In terms of which exceptions get thrown, I
don't
> see much difference between 5/30/60, I don't see a lot of difference.
>
> Infinity (no checkpoint internal) seems to be an interesting value,
> because before crashing, it seems to process around twice as much
state
> as with any finite checkpoint interval. The largest savepoints I
have
> captured have been manually triggered using the /job/:jobid/stop
REST
> API. I think it helps for the snapshot to be synchronous.
>
> One curiosity about the /job/:jobid/stop command is that from
time of
> the command, it often takes many minutes for the internal
processing to
> stop.
>
> Another curiosity about /job/:jobid/stop command is that sometimes
> following a completed savepoint, the cluster goes back to running!
>
> > Coming to (2)
> >
> > What's your input data rate?
>
> My application involves what I will call "main" events that are
enriched
> by "secondary" events. While the secondary events have several
> different input streams, data types, and join keys, I will
estimate the
> secondary events all together. My estimate for input rate is as
follows:
>
> 50M "main" events
> 50 secondary events for each main event, for a
> total of around 2.5B input events
> 8 nodes
> 20 hours
>
> Combining these figures, we can estimate:
>
> 50000000*50/8/20/3600 = 4340 events/second/node
>
> I don't see how to act on your advice for (2). Maybe your idea
is that
> during backfill/bootstrap, I artificially throttle the inputs to my
> application?
>
> 100% of my application state is due to .cogroup, which manages a
> HeapListState on its own. I cannot think of any controls for
changing
> how .cogroup handles internal state per se. I will paste below the
> Flink code path that .cogroup uses to update its internal state
when it
> runs my application.
>
> The only control I can think of with .cogroup that indirectly
impacts
> internal state is delayed triggering.
>
> Currently I use a trigger on every event, which I understand
creates a
> suboptimal number of events. I previously experimented with delayed
> triggering, but I did not get good results.
>
> Just now I tried again ContinuousProcessingTimeTrigger of 30
seconds,
> with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint
> interval. The first checkpoint failed, which has been rare when
I use
> all the same parameters except for triggering on every event. So it
> looks worse not better.
>
> Thanks again,
>
>
> Jeff Henrikson
>
>
>
>
> On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>> Thanks for the reply.
>> I want to discuss more on points (1) and (2)
>> If we take care of them rest will be good
>>
>> Coming to (1)
>>
>> Please try to give reasonable checkpoint interval time for every
job.
>> Minum checkpoint interval recommended by flink community is 3
minutes
>> I thin you should give minimum 3 minutes checkpoint interval for all
>>
>> Coming to (2)
>>
>> What's your input data rate?
>> For example you are seeing data at 100 msg/sec, For each message if
>> there is state changing and you are updating the state with
RocksDB,
>> it's going to
>> create 100 rows in 1 second at RocksDb end, On the average if 50
>> records have changed each second, even if you are using RocksDB
>> differentialstate = true,
>> there is no use. Because everytime 50% is new rows getting
added. So
>> the best bet is to update records with RocksDB only once in your
>> checkpoint interval.
>> Suppose your checkpoint interval is 5 minutes. If you update
RocksDB
>> state once in 5 minutes, then the rate at which new records
added to
>> RocksDB will be 1 record/5min.
>> Whereas in your original scenario, 30000 records added to
rocksDB in 5
>> min. You can save 1:30000 ratio of records in addition to RocksDB.
>> Which will save a huge
>> redundant size addition to RocksDB. Ultimately your state is
driven
>> by your checkpoint interval. From the input source you will go
back 5
>> min back and read the state, similarly from RocksDB side
>> also you can have a state update once in 5 min should work.
Otherwise
>> even if you add state there is no use.
>>
>> Regards
>> Bhaskar
>>
>> Try to update your RocksDB state in an interval equal to the
>> checkpoint interval. Otherwise in my case many times what's
observed is
>> state size grows unnecessarily.
>>
>> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
<jehenri...@gmail.com <mailto:jehenri...@gmail.com>
>> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>> wrote:
>>
>> Vijay,
>>
>> Thanks for your thoughts. Below are answers to your questions.
>>
>> > 1. What's your checkpoint interval?
>>
>> I have used many different checkpoint intervals, ranging from 5
>> minutes
>> to never. I usually setMinPasueBetweenCheckpoints to the same
>> value as
>> the checkpoint interval.
>>
>> > 2. How frequently are you updating the state into RocksDB?
>>
>> My understanding is that for .cogroup:
>>
>> - Triggers control communication outside the operator
>> - Evictors control cleanup of internal state
>> - Configurations like write buffer size control the
frequency of
>> state change at the storage layer
>> - There is no control for how frequently the window state
>> updates at
>> the layer of the RocksDB api layer.
>>
>> Thus, the state update whenever data is ingested.
>>
>> > 3. How many task managers are you using?
>>
>> Usually I have been running with one slot per taskmanager.
28GB of
>> usable ram on each node.
>>
>> > 4. How much data each task manager handles while taking the
>> checkpoint?
>>
>> Funny you should ask. I would be okay with zero.
>>
>> The application I am replacing has a latency of 36-48 hours,
so if I
>> had
>> to fully stop processing to take every snapshot
synchronously, it
>> might
>> be seen as totally acceptable, especially for initial
bootstrap.
>> Also,
>> the velocity of running this backfill is approximately 115x real
>> time on
>> 8 nodes, so the steady-state run may not exhibit the failure
mode in
>> question at all.
>>
>> It has come as some frustration to me that, in the case of
>> RocksDBStateBackend, the configuration key state.backend.async
>> effectively has no meaningful way to be false.
>>
>> The only way I have found in the existing code to get a
behavior like
>> synchronous snapshot is to POST to /jobs/<jobID>/stop with
>> drain=false
>> and a URL. This method of failing fast is the way that I
discovered
>> that I needed to increase transfer threads from the default.
>>
>> The reason I don't just run the whole backfill and then take one
>> snapshot is that even in the absence of checkpoints, a very
similar
>> congestion seems to take the cluster down when I am say
20-30% of the
>> way through my backfill.
>>
>> Reloading from my largest feasible snapshot makes it
possible to make
>> another snapshot a bit larger before crash, but not by much.
>>
>> On first glance, the code change to allow
RocksDBStateBackend into a
>> synchronous snapshots mode looks pretty easy. Nevertheless,
I was
>> hoping to do the initial launch of my application without
needing to
>> modify the framework.
>>
>> Regards,
>>
>>
>> Jeff Henrikson
>>
>>
>> On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>> > For me this seems to be an IO bottleneck at your task
manager.
>> > I have a couple of queries:
>> > 1. What's your checkpoint interval?
>> > 2. How frequently are you updating the state into RocksDB?
>> > 3. How many task managers are you using?
>> > 4. How much data each task manager handles while taking the
>> checkpoint?
>> >
>> > For points (3) and (4) , you should be very careful. I
feel you
>> are
>> > stuck at this.
>> > You try to scale vertically by increasing more CPU and
memory for
>> each
>> > task manager.
>> > If not, try to scale horizontally so that each task
manager IO
>> gets reduces
>> > Apart from that check is there any bottleneck with the file
>> system.
>> >
>> > Regards
>> > Bhaskar
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
<vict...@gmail.com <mailto:vict...@gmail.com>
>> <mailto:vict...@gmail.com <mailto:vict...@gmail.com>>
>> > <mailto:vict...@gmail.com <mailto:vict...@gmail.com>
<mailto:vict...@gmail.com <mailto:vict...@gmail.com>>>> wrote:
>> >
>> > I had a similar problem. I ended up solving by not
>> relying on
>> > checkpoints for recovery and instead re-read my input
sources
>> (in my
>> > case a kafka topic) from the earliest offset and
rebuilding
>> only the
>> > state I need. I only need to care about the past 1 to 2
>> days of
>> > state so can afford to drop anything older. My recovery
>> time went
>> > from over an hour for just the first checkpoint to
under 10
>> minutes.
>> >
>> > Tim
>> >
>> > On Wed, Jun 17, 2020, 11:52 PM Yun Tang
<myas...@live.com <mailto:myas...@live.com>
>> <mailto:myas...@live.com <mailto:myas...@live.com>>
>> > <mailto:myas...@live.com <mailto:myas...@live.com>
<mailto:myas...@live.com <mailto:myas...@live.com>>>> wrote:
>> >
>> > Hi Jeff
>> >
>> > 1. "after around 50GB of state, I stop being able to
>> reliably
>> > take checkpoints or savepoints. "
>> > What is the exact reason that job cannot complete
>> > checkpoint? Expired before completing or
decline by
>> some
>> > tasks? The former one is manly caused by high
>> back-pressure
>> > and the later one is mainly due to some internal
>> error.
>> > 2. Have you checked what reason the remote task
manager
>> is lost?
>> > If the remote task manager is not crashed, it
might
>> be due
>> > to GC impact, I think you might need to check
>> task-manager
>> > logs and GC logs.
>> >
>> > Best
>> > Yun Tang
>> >
>>
------------------------------------------------------------------------
>> > *From:* Jeff Henrikson <jehenri...@gmail.com
<mailto:jehenri...@gmail.com>
>> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>
>> > <mailto:jehenri...@gmail.com
<mailto:jehenri...@gmail.com>
>> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>>>
>> > *Sent:* Thursday, June 18, 2020 1:46
>> > *To:* user <user@flink.apache.org
<mailto:user@flink.apache.org>
>> <mailto:user@flink.apache.org
<mailto:user@flink.apache.org>> <mailto:user@flink.apache.org
<mailto:user@flink.apache.org>
>> <mailto:user@flink.apache.org <mailto:user@flink.apache.org>>>>
>> > *Subject:* Trouble with large state
>> > Hello Flink users,
>> >
>> > I have an application of around 10 enrichment
joins. All
>> events
>> > are
>> > read from kafka and have event timestamps. The
joins are
>> built
>> > using
>> > .cogroup, with a global window, triggering on every 1
>> event, plus a
>> > custom evictor that drops records once a newer
record
>> for the
>> > same ID
>> > has been processed. Deletes are represented by empty
>> events with
>> > timestamp and ID (tombstones). That way, we can drop
>> records when
>> > business logic dictates, as opposed to when a maximum
>> retention
>> > has been
>> > attained. The application runs
RocksDBStateBackend, on
>> > Kubernetes on
>> > AWS with local SSDs.
>> >
>> > Unit tests show that the joins produce expected
>> results. On an
>> > 8 node
>> > cluster, watermark output progress seems to
indicate I
>> should be
>> > able to
>> > bootstrap my state of around 500GB in around 1
day. I am
>> able
>> > to save
>> > and restore savepoints for the first half an hour
of run
>> time.
>> >
>> > My current trouble is that after around 50GB of
state,
>> I stop
>> > being able
>> > to reliably take checkpoints or savepoints. Some
time
>> after
>> > that, I
>> > start getting a variety of failures where the first
>> suspicious
>> > log event
>> > is a generic cluster connectivity error, such as:
>> >
>> > 1) java.io.IOException: Connecting the channel
>> failed:
>> > Connecting
>> > to remote task manager +
'/10.67.7.101:38955 <http://10.67.7.101:38955>
>> <http://10.67.7.101:38955>
>> > <http://10.67.7.101:38955>' has failed. This
>> > might indicate that the remote task manager has
>> been lost.
>> >
>> > 2) org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>
>> <http://org.apache.flink.runtime.io>.network.netty.exception
>> > .RemoteTransportException: Connection
unexpectedly
>> closed
>> > by remote
>> > task manager 'null'. This might indicate
that the
>> remote task
>> > manager was lost.
>> >
>> > 3) Association with remote system
>> > [akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>
>> <http://flink@10.67.6.66:34987>
>> > <http://flink@10.67.6.66:34987>] has failed,
address is
>> now
>> > gated for [50] ms. Reason: [Association
failed with
>> > [akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>
>> <http://flink@10.67.6.66:34987>
>> > <http://flink@10.67.6.66:34987>]] Caused by:
>> > [java.net <http://java.net>
<http://java.net>.NoRouteToHostException:
>> No route to host]
>> >
>> > I don't see any obvious out of memory errors on the
>> TaskManager UI.
>> >
>> > Adding nodes to the cluster does not seem to
increase the
>> maximum
>> > savable state size.
>> >
>> > I could enable HA, but for the time being I have been
>> leaving it
>> > out to
>> > avoid the possibility of masking deterministic
faults.
>> >
>> > Below are my configurations.
>> >
>> > Thanks in advance for any advice.
>> >
>> > Regards,
>> >
>> >
>> > Jeff Henrikson
>> >
>> >
>> >
>> > Flink version: 1.10
>> >
>> > Configuration set via code:
>> > parallelism=8
>> > maxParallelism=64
>> > setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >
>> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>> > setTolerableCheckpointFailureNumber(1000)
>> > setMaxConcurrentCheckpoints(1)
>> >
>> >
>>
enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>
>> > RocksDBStateBackend
>> >
setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>> > setNumberOfTransferThreads(25)
>> > setDbStoragePath points to a local nvme SSD
>> >
>> > Configuration in flink-conf.yaml:
>> >
>> > jobmanager.rpc.address: localhost
>> > jobmanager.rpc.port: 6123
>> > jobmanager.heap.size: 28000m
>> > taskmanager.memory.process.size: 28000m
>> > taskmanager.memory.jvm-metaspace.size: 512m
>> > taskmanager.numberOfTaskSlots: 1
>> > parallelism.default: 1
>> > jobmanager.execution.failover-strategy: full
>> >
>> > cluster.evenly-spread-out-slots: false
>> >
>> > taskmanager.memory.network.fraction:
0.2 #
>> > default 0.1
>> > taskmanager.memory.framework.off-heap.size: 2GB
>> > taskmanager.memory.task.off-heap.size: 2GB
>> >
taskmanager.network.memory.buffers-per-channel: 32
>> # default 2
>> > taskmanager.memory.managed.fraction: 0.4 #
>> docs say
>> > default 0.1, but something seems to set 0.4
>> > taskmanager.memory.task.off-heap.size:
2048MB #
>> > default 128M
>> >
>> > state.backend.fs.memory-threshold: 1048576
>> > state.backend.fs.write-buffer-size: 10240000
>> > state.backend.local-recovery: true
>> > state.backend.rocksdb.writebuffer.size: 64MB
>> > state.backend.rocksdb.writebuffer.count: 8
>> >
state.backend.rocksdb.writebuffer.number-to-merge: 4
>> >
state.backend.rocksdb.timer-service.factory: heap
>> > state.backend.rocksdb.block.cache-size:
64000000 #
>> default 8MB
>> > state.backend.rocksdb.write-batch-size:
16000000 #
>> default 2MB
>> >
>> > web.checkpoints.history: 250
>> >
>>