Jeff Glad to know that you are able to progress well and issue got resolved
Regards Bhaskar On Tue, Jun 23, 2020 at 12:24 AM Jeff Henrikson <jehenri...@gmail.com> wrote: > Bhaskar, > > I think I am unstuck. The performance numbers I sent after throttling > were due to a one character error in business logic. I think I now have > something good enough to work with for now. I will repost if I > encounter further unexpected issues. > > Adding application-level throttling ends up resolving both my symptom of > slow/failing checkpoints, and also my symptom of crashes during long runs. > > Many thanks! > > > Jeff > > > On 6/20/20 11:46 AM, Jeff Henrikson wrote: > > Bhaskar, > > > > > Glad to know some progress. > > > > Yeah, some progress. Yet overnight run didn't look as good as I hoped. > > > > The throttling required to not crash during snapshots seems to be quite > > different from the throttling required to crash not during snapshots. So > > the lowest common denominator is quite a large performance penalty. > > > > What's worse, the rate of input that makes the snapshot performance go > > from good to bad seems to change significantly as the state size grows. > > Here is checkpoint history from an overnight run. > > > > Parameters: > > > > - 30 minutes minimum between snapshots > > - incremental snapshot mode > > - inputs throttled to 100 events per sec per input per slot, > > which is around 1/4 of the unthrottled throughput > > > > Checkpoint history: > > > > ID Status Acknowledged Trigger Time Latest > > Acknowledgement End to End Duration State Size Buffered During > > Alignment > > 12 COMPLETED 304/304 8:52:22 10:37:18 1h 44m 55s > > 60.5 GB 0 B > > 11 COMPLETED 304/304 6:47:03 8:22:19 1h 35m 16s > > 53.3 GB 0 B > > 10 COMPLETED 304/304 5:01:20 6:17:00 1h 15m 39s > > 41.0 GB 0 B > > 9 COMPLETED 304/304 3:47:43 4:31:19 43m 35s 34.1 > > GB 0 B > > 8 COMPLETED 304/304 2:40:58 3:17:42 36m 43s 27.8 > > GB 0 B > > 7 COMPLETED 304/304 1:39:15 2:10:57 31m 42s 23.1 > > GB 0 B > > 6 COMPLETED 304/304 0:58:02 1:09:13 11m 11s 17.4 > > GB 0 B > > 5 COMPLETED 304/304 0:23:27 0:28:01 4m 33s 14.3 > > GB 0 B > > 4 COMPLETED 304/304 23:52:29 23:53:26 56s 12.7 > > GB 0 B > > 3 COMPLETED 304/304 23:20:59 23:22:28 1m 29s 10.8 > > GB 0 B > > 2 COMPLETED 304/304 22:46:17 22:50:58 4m 40s 7.40 > > GB 0 B > > > > As you can see, GB/minute varies drastically. GB/minute also varies > > drastically with full checkpoint mode. > > > > I'm pleased that it hasn't crashed yet. Yet I'm concerned that with the > > checkpoint GB/minute getting so slow, it will crash soon. > > > > I'm really wishing state.backend.async=false worked for > > RocksDbStateBackend. > > > > I'm also wondering if my throttler would improve if I just connected to > > the REST api to ask if any checkpoint is in progress, and then paused > > inputs accordingly. Effectively state.backend.async=false via hacked > > application code. > > > > > Where are you updating your state here? I > > > couldn't find any flink managed state here. > > > > The only updates to state I make are through the built-in > > DataStream.cogroup. A unit test (without RocksDB loaded) of the way I > > use .cogroup shows exactly two ways that .cogroup calls an > > implementation of AppendingState.add. I summarize those below. > > > > The two AppendingState subclasses invoked are HeapListState and > > HeapReducingState. Neither have a support attribute on them, such as > > MapState's @PublicEvolving. > > > > > I suggested updating the flink managed state using onTimer over an > > > interval equal to the checkpoint interval. > > > > So the onTimer method, with interval set to the checkpoint interval. > > Interesting. > > > > It looks like the closest subclass for my use case use would be either > > KeyedCoProcessFunction. Let me see if I understand concretely the idea: > > > > 1) between checkpoints, read join input and write join output, by > > loading any state reads from external state, but buffering all state > > changes in memory in some kind of data structure. > > > > 2) whenever a checkpoint arrived or the memory consumed by buffered > > writes gets too big, flush the writes to state. > > > > Is that the gist of the idea about .onTimer? > > > > > > Jeff > > > > > > > > There are two paths from .coGroup to AppendingState.add > > > > path 1 of 2: .coGroup to HeapListState > > > > add:90, HeapListState {org.apache.flink.runtime.state.heap} > > processElement:203, EvictingWindowOperator > > {org.apache.flink.streaming.runtime.operators.windowing} > > processElement:164, StreamOneInputProcessor > > {org.apache.flink.streaming.runtime.io} > > processInput:143, StreamOneInputProcessor > > {org.apache.flink.streaming.runtime.io} > > > > > > > org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement > > > > > > > (windowAssigner is an instance of GlobalWindows) > > > > @Override > > public void processElement(StreamRecord<IN> element) > > throws Exception { > > final Collection<W> elementWindows = > > windowAssigner.assignWindows( > > element.getValue(), element.getTimestamp(), > > windowAssignerContext); > > > > //if element is handled by none of assigned > > elementWindows > > boolean isSkippedElement = true; > > > > final K key = > > this.<K>getKeyedStateBackend().getCurrentKey(); > > > > if (windowAssigner instanceof > MergingWindowAssigner) { > > . . . > > } else { > > for (W window : elementWindows) { > > > > // check if the window is already inactive > > if (isWindowLate(window)) { > > continue; > > } > > isSkippedElement = false; > > > > > > evictingWindowState.setCurrentNamespace(window); > > evictingWindowState.add(element); > > > > => > > > > org.apache.flink.runtime.state.heap.HeapListState#add: > > @Override > > public void add(V value) { > > Preconditions.checkNotNull(value, "You cannot > > add null to a ListState."); > > > > final N namespace = currentNamespace; > > > > final StateTable<K, N, List<V>> map = > stateTable; > > List<V> list = map.get(namespace); > > > > if (list == null) { > > list = new ArrayList<>(); > > map.put(namespace, list); > > } > > list.add(value); > > } > > > > path 2 of 2: .coGroup to HeapReducingState > > > > add:95, HeapReducingState > > {org.apache.flink.runtime.state.heap} > > onElement:49, CountTrigger > > {org.apache.flink.streaming.api.windowing.triggers} > > onElement:898, WindowOperator$Context > > {org.apache.flink.streaming.runtime.operators.windowing} > > processElement:210, EvictingWindowOperator > > {org.apache.flink.streaming.runtime.operators.windowing} > > processElement:164, StreamOneInputProcessor > > {org.apache.flink.streaming.runtime.io} > > processInput:143, StreamOneInputProcessor > > {org.apache.flink.streaming.runtime.io} > > > > @Override > > public void processElement(StreamRecord<IN> element) throws > > Exception { > > final Collection<W> elementWindows = > > windowAssigner.assignWindows( > > element.getValue(), element.getTimestamp(), > > windowAssignerContext); > > > > //if element is handled by none of assigned > elementWindows > > boolean isSkippedElement = true; > > > > final K key = > > this.<K>getKeyedStateBackend().getCurrentKey(); > > > > if (windowAssigner instanceof MergingWindowAssigner) { > > . . . > > } else { > > for (W window : elementWindows) { > > > > // check if the window is already inactive > > if (isWindowLate(window)) { > > continue; > > } > > isSkippedElement = false; > > > > evictingWindowState.setCurrentNamespace(window); > > evictingWindowState.add(element); > > > > triggerContext.key = key; > > triggerContext.window = window; > > evictorContext.key = key; > > evictorContext.window = window; > > > > TriggerResult triggerResult = > > triggerContext.onElement(element); > > > > => > > public TriggerResult onElement(StreamRecord<IN> > > element) throws Exception { > > return trigger.onElement(element.getValue(), > > element.getTimestamp(), window, this); > > > > => > > > > @Override > > public TriggerResult onElement(Object element, long > > timestamp, W window, TriggerContext ctx) throws Exception { > > ReducingState<Long> count = > > ctx.getPartitionedState(stateDesc); > > count.add(1L); > > > > => > > > > org.apache.flink.runtime.state.heap.HeapReducingState#add > > @Override > > public void add(V value) throws IOException { > > > > if (value == null) { > > > > > > > > On 6/19/20 8:22 PM, Vijay Bhaskar wrote: > >> Glad to know some progress. Where are you updating your state here? I > >> couldn't find any flink managed state here. > >> I suggested updating the flink managed state using onTimer over an > >> interval equal to the checkpoint interval. > >> > >> In your case since you do throttling, it helped to maintain the fixed > >> rate per slot. Before the rate was sporadic. > >> It's definitely an IO bottleneck. > >> > >> So now you can think of decoupling stateless scanning and stateful > joins. > >> For example you can keep a stateless scan as separate flink job and > >> keep its output in some Kafka kind of store. > >> > >> From there you start your stateful joins. This would help focussing > >> on your stateful job in much better fashion > >> > >> Regards > >> Bhaskar > >> > >> > >> > >> > >> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenri...@gmail.com > >> <mailto:jehenri...@gmail.com>> wrote: > >> > >> Bhaskar, > >> > >> Based on your idea of limiting input to get better checkpoint > >> behavior, > >> I made a ProcessFunction that constraints to a number of events per > >> second per slot per input. I do need to do some stateless input > >> scanning before joins. The stateless part needs to be fast and > >> does no > >> impact snapshots. So I inserted the throttling after the input > >> preprocessing but before the stateful transformations. There is a > >> significant difference of snapshot throughput (often 5x or larger) > >> when > >> I change the throttle between 200 and 300 events per second (per > slot > >> per input). > >> > >> Hope the throttling keeps being effective as I keep the job running > >> longer. > >> > >> Odd. But likely a very effective way out of my problem. > >> > >> I wonder what drives it . . . Thread contention? IOPS contention? > >> > >> See ProcessFunction code below. > >> > >> Many thanks! > >> > >> > >> Jeff > >> > >> > >> > >> import org.apache.flink.streaming.api.functions.ProcessFunction > >> import org.apache.flink.util.Collector > >> > >> // Set eventsPerSecMax to -1 to disable the throttle > >> // TODO: Actual number of events can be slightly larger > >> // TODO: Remove pause correlation with system clock > >> > >> case class Throttler[T](eventsPerSecMax : Double) extends > >> ProcessFunction[T,T] { > >> var minutePrev = 0 > >> var numEvents = 0 > >> def minutes() = { > >> val ms = System.currentTimeMillis() > >> (ms / 1000 / 60).toInt > >> } > >> def increment() = { > >> val m = minutes() > >> if(m != minutePrev) { > >> numEvents = 0 > >> } > >> numEvents += 1 > >> } > >> def eps() = { > >> numEvents/60.0 > >> } > >> override def processElement(x: T, ctx: ProcessFunction[T, > >> T]#Context, > >> out: Collector[T]): Unit = { > >> increment() > >> if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) { > >> Thread.sleep(1000L) > >> } > >> out.collect(x) > >> } > >> } > >> > >> On 6/19/20 9:16 AM, Jeff Henrikson wrote: > >> > Bhaskar, > >> > > >> > Thank you for your thoughtful points. > >> > > >> > > I want to discuss more on points (1) and (2) > >> > > If we take care of them rest will be good > >> > > > >> > > Coming to (1) > >> > > > >> > > Please try to give reasonable checkpoint interval time for > >> every job. > >> > > Minum checkpoint interval recommended by flink community is 3 > >> minutes > >> > > I thin you should give minimum 3 minutes checkpoint interval > >> for all > >> > > >> > I have spent very little time testing with checkpoint intervals > >> of under > >> > 3 minutes. I frequently test with intervals of 5 minutes and > >> of 30 > >> > minutes. I also test with checkpoint intervals such as 60 > >> minutes, and > >> > never (manual only). In terms of which exceptions get thrown, I > >> don't > >> > see much difference between 5/30/60, I don't see a lot of > >> difference. > >> > > >> > Infinity (no checkpoint internal) seems to be an interesting > >> value, > >> > because before crashing, it seems to process around twice as much > >> state > >> > as with any finite checkpoint interval. The largest savepoints I > >> have > >> > captured have been manually triggered using the /job/:jobid/stop > >> REST > >> > API. I think it helps for the snapshot to be synchronous. > >> > > >> > One curiosity about the /job/:jobid/stop command is that from > >> time of > >> > the command, it often takes many minutes for the internal > >> processing to > >> > stop. > >> > > >> > Another curiosity about /job/:jobid/stop command is that > sometimes > >> > following a completed savepoint, the cluster goes back to > running! > >> > > >> > > Coming to (2) > >> > > > >> > > What's your input data rate? > >> > > >> > My application involves what I will call "main" events that are > >> enriched > >> > by "secondary" events. While the secondary events have several > >> > different input streams, data types, and join keys, I will > >> estimate the > >> > secondary events all together. My estimate for input rate is as > >> follows: > >> > > >> > 50M "main" events > >> > 50 secondary events for each main event, for a > >> > total of around 2.5B input events > >> > 8 nodes > >> > 20 hours > >> > > >> > Combining these figures, we can estimate: > >> > > >> > 50000000*50/8/20/3600 = 4340 events/second/node > >> > > >> > I don't see how to act on your advice for (2). Maybe your idea > >> is that > >> > during backfill/bootstrap, I artificially throttle the inputs > >> to my > >> > application? > >> > > >> > 100% of my application state is due to .cogroup, which manages a > >> > HeapListState on its own. I cannot think of any controls for > >> changing > >> > how .cogroup handles internal state per se. I will paste below > >> the > >> > Flink code path that .cogroup uses to update its internal state > >> when it > >> > runs my application. > >> > > >> > The only control I can think of with .cogroup that indirectly > >> impacts > >> > internal state is delayed triggering. > >> > > >> > Currently I use a trigger on every event, which I understand > >> creates a > >> > suboptimal number of events. I previously experimented with > >> delayed > >> > triggering, but I did not get good results. > >> > > >> > Just now I tried again ContinuousProcessingTimeTrigger of 30 > >> seconds, > >> > with rocksdb.timer-service.factory: heap, and a 5 minute > >> checkpoint > >> > interval. The first checkpoint failed, which has been rare when > >> I use > >> > all the same parameters except for triggering on every event. > >> So it > >> > looks worse not better. > >> > > >> > Thanks again, > >> > > >> > > >> > Jeff Henrikson > >> > > >> > > >> > > >> > > >> > On 6/18/20 11:21 PM, Vijay Bhaskar wrote: > >> >> Thanks for the reply. > >> >> I want to discuss more on points (1) and (2) > >> >> If we take care of them rest will be good > >> >> > >> >> Coming to (1) > >> >> > >> >> Please try to give reasonable checkpoint interval time for every > >> job. > >> >> Minum checkpoint interval recommended by flink community is 3 > >> minutes > >> >> I thin you should give minimum 3 minutes checkpoint interval > >> for all > >> >> > >> >> Coming to (2) > >> >> > >> >> What's your input data rate? > >> >> For example you are seeing data at 100 msg/sec, For each > >> message if > >> >> there is state changing and you are updating the state with > >> RocksDB, > >> >> it's going to > >> >> create 100 rows in 1 second at RocksDb end, On the average if 50 > >> >> records have changed each second, even if you are using RocksDB > >> >> differentialstate = true, > >> >> there is no use. Because everytime 50% is new rows getting > >> added. So > >> >> the best bet is to update records with RocksDB only once in your > >> >> checkpoint interval. > >> >> Suppose your checkpoint interval is 5 minutes. If you update > >> RocksDB > >> >> state once in 5 minutes, then the rate at which new records > >> added to > >> >> RocksDB will be 1 record/5min. > >> >> Whereas in your original scenario, 30000 records added to > >> rocksDB in 5 > >> >> min. You can save 1:30000 ratio of records in addition to > >> RocksDB. > >> >> Which will save a huge > >> >> redundant size addition to RocksDB. Ultimately your state is > >> driven > >> >> by your checkpoint interval. From the input source you will go > >> back 5 > >> >> min back and read the state, similarly from RocksDB side > >> >> also you can have a state update once in 5 min should work. > >> Otherwise > >> >> even if you add state there is no use. > >> >> > >> >> Regards > >> >> Bhaskar > >> >> > >> >> Try to update your RocksDB state in an interval equal to the > >> >> checkpoint interval. Otherwise in my case many times what's > >> observed is > >> >> state size grows unnecessarily. > >> >> > >> >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson > >> <jehenri...@gmail.com <mailto:jehenri...@gmail.com> > >> >> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>> > >> wrote: > >> >> > >> >> Vijay, > >> >> > >> >> Thanks for your thoughts. Below are answers to your > >> questions. > >> >> > >> >> > 1. What's your checkpoint interval? > >> >> > >> >> I have used many different checkpoint intervals, ranging > >> from 5 > >> >> minutes > >> >> to never. I usually setMinPasueBetweenCheckpoints to the > >> same > >> >> value as > >> >> the checkpoint interval. > >> >> > >> >> > 2. How frequently are you updating the state into > >> RocksDB? > >> >> > >> >> My understanding is that for .cogroup: > >> >> > >> >> - Triggers control communication outside the operator > >> >> - Evictors control cleanup of internal state > >> >> - Configurations like write buffer size control the > >> frequency of > >> >> state change at the storage layer > >> >> - There is no control for how frequently the window > state > >> >> updates at > >> >> the layer of the RocksDB api layer. > >> >> > >> >> Thus, the state update whenever data is ingested. > >> >> > >> >> > 3. How many task managers are you using? > >> >> > >> >> Usually I have been running with one slot per taskmanager. > >> 28GB of > >> >> usable ram on each node. > >> >> > >> >> > 4. How much data each task manager handles while > >> taking the > >> >> checkpoint? > >> >> > >> >> Funny you should ask. I would be okay with zero. > >> >> > >> >> The application I am replacing has a latency of 36-48 hours, > >> so if I > >> >> had > >> >> to fully stop processing to take every snapshot > >> synchronously, it > >> >> might > >> >> be seen as totally acceptable, especially for initial > >> bootstrap. > >> >> Also, > >> >> the velocity of running this backfill is approximately > >> 115x real > >> >> time on > >> >> 8 nodes, so the steady-state run may not exhibit the failure > >> mode in > >> >> question at all. > >> >> > >> >> It has come as some frustration to me that, in the case of > >> >> RocksDBStateBackend, the configuration key > >> state.backend.async > >> >> effectively has no meaningful way to be false. > >> >> > >> >> The only way I have found in the existing code to get a > >> behavior like > >> >> synchronous snapshot is to POST to /jobs/<jobID>/stop with > >> >> drain=false > >> >> and a URL. This method of failing fast is the way that I > >> discovered > >> >> that I needed to increase transfer threads from the default. > >> >> > >> >> The reason I don't just run the whole backfill and then > >> take one > >> >> snapshot is that even in the absence of checkpoints, a very > >> similar > >> >> congestion seems to take the cluster down when I am say > >> 20-30% of the > >> >> way through my backfill. > >> >> > >> >> Reloading from my largest feasible snapshot makes it > >> possible to make > >> >> another snapshot a bit larger before crash, but not by much. > >> >> > >> >> On first glance, the code change to allow > >> RocksDBStateBackend into a > >> >> synchronous snapshots mode looks pretty easy. Nevertheless, > >> I was > >> >> hoping to do the initial launch of my application without > >> needing to > >> >> modify the framework. > >> >> > >> >> Regards, > >> >> > >> >> > >> >> Jeff Henrikson > >> >> > >> >> > >> >> On 6/18/20 7:28 AM, Vijay Bhaskar wrote: > >> >> > For me this seems to be an IO bottleneck at your task > >> manager. > >> >> > I have a couple of queries: > >> >> > 1. What's your checkpoint interval? > >> >> > 2. How frequently are you updating the state into > RocksDB? > >> >> > 3. How many task managers are you using? > >> >> > 4. How much data each task manager handles while taking > >> the > >> >> checkpoint? > >> >> > > >> >> > For points (3) and (4) , you should be very careful. I > >> feel you > >> >> are > >> >> > stuck at this. > >> >> > You try to scale vertically by increasing more CPU and > >> memory for > >> >> each > >> >> > task manager. > >> >> > If not, try to scale horizontally so that each task > >> manager IO > >> >> gets reduces > >> >> > Apart from that check is there any bottleneck with the > >> file > >> >> system. > >> >> > > >> >> > Regards > >> >> > Bhaskar > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor > >> <vict...@gmail.com <mailto:vict...@gmail.com> > >> >> <mailto:vict...@gmail.com <mailto:vict...@gmail.com>> > >> >> > <mailto:vict...@gmail.com <mailto:vict...@gmail.com> > >> <mailto:vict...@gmail.com <mailto:vict...@gmail.com>>>> wrote: > >> >> > > >> >> > I had a similar problem. I ended up solving by not > >> >> relying on > >> >> > checkpoints for recovery and instead re-read my input > >> sources > >> >> (in my > >> >> > case a kafka topic) from the earliest offset and > >> rebuilding > >> >> only the > >> >> > state I need. I only need to care about the past 1 > >> to 2 > >> >> days of > >> >> > state so can afford to drop anything older. My > >> recovery > >> >> time went > >> >> > from over an hour for just the first checkpoint to > >> under 10 > >> >> minutes. > >> >> > > >> >> > Tim > >> >> > > >> >> > On Wed, Jun 17, 2020, 11:52 PM Yun Tang > >> <myas...@live.com <mailto:myas...@live.com> > >> >> <mailto:myas...@live.com <mailto:myas...@live.com>> > >> >> > <mailto:myas...@live.com <mailto:myas...@live.com> > >> <mailto:myas...@live.com <mailto:myas...@live.com>>>> wrote: > >> >> > > >> >> > Hi Jeff > >> >> > > >> >> > 1. "after around 50GB of state, I stop being > >> able to > >> >> reliably > >> >> > take checkpoints or savepoints. " > >> >> > What is the exact reason that job cannot > >> complete > >> >> > checkpoint? Expired before completing or > >> decline by > >> >> some > >> >> > tasks? The former one is manly caused by high > >> >> back-pressure > >> >> > and the later one is mainly due to some > >> internal > >> >> error. > >> >> > 2. Have you checked what reason the remote task > >> manager > >> >> is lost? > >> >> > If the remote task manager is not crashed, it > >> might > >> >> be due > >> >> > to GC impact, I think you might need to check > >> >> task-manager > >> >> > logs and GC logs. > >> >> > > >> >> > Best > >> >> > Yun Tang > >> >> > > >> >> > >> > >> > ------------------------------------------------------------------------ > >> >> > *From:* Jeff Henrikson <jehenri...@gmail.com > >> <mailto:jehenri...@gmail.com> > >> >> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>> > >> >> > <mailto:jehenri...@gmail.com > >> <mailto:jehenri...@gmail.com> > >> >> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>>> > >> >> > *Sent:* Thursday, June 18, 2020 1:46 > >> >> > *To:* user <user@flink.apache.org > >> <mailto:user@flink.apache.org> > >> >> <mailto:user@flink.apache.org > >> <mailto:user@flink.apache.org>> <mailto:user@flink.apache.org > >> <mailto:user@flink.apache.org> > >> >> <mailto:user@flink.apache.org > >> <mailto:user@flink.apache.org>>>> > >> >> > *Subject:* Trouble with large state > >> >> > Hello Flink users, > >> >> > > >> >> > I have an application of around 10 enrichment > >> joins. All > >> >> events > >> >> > are > >> >> > read from kafka and have event timestamps. The > >> joins are > >> >> built > >> >> > using > >> >> > .cogroup, with a global window, triggering on > >> every 1 > >> >> event, plus a > >> >> > custom evictor that drops records once a newer > >> record > >> >> for the > >> >> > same ID > >> >> > has been processed. Deletes are represented by > >> empty > >> >> events with > >> >> > timestamp and ID (tombstones). That way, we can > >> drop > >> >> records when > >> >> > business logic dictates, as opposed to when a > >> maximum > >> >> retention > >> >> > has been > >> >> > attained. The application runs > >> RocksDBStateBackend, on > >> >> > Kubernetes on > >> >> > AWS with local SSDs. > >> >> > > >> >> > Unit tests show that the joins produce expected > >> >> results. On an > >> >> > 8 node > >> >> > cluster, watermark output progress seems to > >> indicate I > >> >> should be > >> >> > able to > >> >> > bootstrap my state of around 500GB in around 1 > >> day. I am > >> >> able > >> >> > to save > >> >> > and restore savepoints for the first half an hour > >> of run > >> >> time. > >> >> > > >> >> > My current trouble is that after around 50GB of > >> state, > >> >> I stop > >> >> > being able > >> >> > to reliably take checkpoints or savepoints. Some > >> time > >> >> after > >> >> > that, I > >> >> > start getting a variety of failures where the > >> first > >> >> suspicious > >> >> > log event > >> >> > is a generic cluster connectivity error, such as: > >> >> > > >> >> > 1) java.io.IOException: Connecting the > >> channel > >> >> failed: > >> >> > Connecting > >> >> > to remote task manager + > >> '/10.67.7.101:38955 <http://10.67.7.101:38955> > >> >> <http://10.67.7.101:38955> > >> >> > <http://10.67.7.101:38955>' has failed. This > >> >> > might indicate that the remote task > >> manager has > >> >> been lost. > >> >> > > >> >> > 2) org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io> > >> >> <http://org.apache.flink.runtime.io > >.network.netty.exception > >> >> > .RemoteTransportException: Connection > >> unexpectedly > >> >> closed > >> >> > by remote > >> >> > task manager 'null'. This might indicate > >> that the > >> >> remote task > >> >> > manager was lost. > >> >> > > >> >> > 3) Association with remote system > >> >> > [akka.tcp://flink@10.67.6.66:34987 > >> <http://flink@10.67.6.66:34987> > >> >> <http://flink@10.67.6.66:34987> > >> >> > <http://flink@10.67.6.66:34987>] has failed, > >> address is > >> >> now > >> >> > gated for [50] ms. Reason: [Association > >> failed with > >> >> > [akka.tcp://flink@10.67.6.66:34987 > >> <http://flink@10.67.6.66:34987> > >> >> <http://flink@10.67.6.66:34987> > >> >> > <http://flink@10.67.6.66:34987>]] Caused by: > >> >> > [java.net <http://java.net> > >> <http://java.net>.NoRouteToHostException: > >> >> No route to host] > >> >> > > >> >> > I don't see any obvious out of memory errors on > >> the > >> >> TaskManager UI. > >> >> > > >> >> > Adding nodes to the cluster does not seem to > >> increase the > >> >> maximum > >> >> > savable state size. > >> >> > > >> >> > I could enable HA, but for the time being I > >> have been > >> >> leaving it > >> >> > out to > >> >> > avoid the possibility of masking deterministic > >> faults. > >> >> > > >> >> > Below are my configurations. > >> >> > > >> >> > Thanks in advance for any advice. > >> >> > > >> >> > Regards, > >> >> > > >> >> > > >> >> > Jeff Henrikson > >> >> > > >> >> > > >> >> > > >> >> > Flink version: 1.10 > >> >> > > >> >> > Configuration set via code: > >> >> > parallelism=8 > >> >> > maxParallelism=64 > >> >> > > >> setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> > > >> >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > >> >> > setTolerableCheckpointFailureNumber(1000) > >> >> > setMaxConcurrentCheckpoints(1) > >> >> > > >> >> > > >> >> > >> > >> > > enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > > >> > >> >> > >> >> > RocksDBStateBackend > >> >> > > >> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) > >> >> > setNumberOfTransferThreads(25) > >> >> > setDbStoragePath points to a local nvme SSD > >> >> > > >> >> > Configuration in flink-conf.yaml: > >> >> > > >> >> > jobmanager.rpc.address: localhost > >> >> > jobmanager.rpc.port: 6123 > >> >> > jobmanager.heap.size: 28000m > >> >> > taskmanager.memory.process.size: 28000m > >> >> > taskmanager.memory.jvm-metaspace.size: 512m > >> >> > taskmanager.numberOfTaskSlots: 1 > >> >> > parallelism.default: 1 > >> >> > jobmanager.execution.failover-strategy: > full > >> >> > > >> >> > cluster.evenly-spread-out-slots: false > >> >> > > >> >> > taskmanager.memory.network.fraction: > >> 0.2 # > >> >> > default 0.1 > >> >> > > >> taskmanager.memory.framework.off-heap.size: 2GB > >> >> > taskmanager.memory.task.off-heap.size: 2GB > >> >> > taskmanager.network.memory.buffers-per-channel: 32 > >> >> # default 2 > >> >> > taskmanager.memory.managed.fraction: 0.4 > >> # > >> >> docs say > >> >> > default 0.1, but something seems to set 0.4 > >> >> > taskmanager.memory.task.off-heap.size: > >> 2048MB # > >> >> > default 128M > >> >> > > >> >> > state.backend.fs.memory-threshold: 1048576 > >> >> > state.backend.fs.write-buffer-size: > 10240000 > >> >> > state.backend.local-recovery: true > >> >> > state.backend.rocksdb.writebuffer.size: > 64MB > >> >> > state.backend.rocksdb.writebuffer.count: 8 > >> >> > state.backend.rocksdb.writebuffer.number-to-merge: 4 > >> >> > state.backend.rocksdb.timer-service.factory: heap > >> >> > state.backend.rocksdb.block.cache-size: > >> 64000000 # > >> >> default 8MB > >> >> > state.backend.rocksdb.write-batch-size: > >> 16000000 # > >> >> default 2MB > >> >> > > >> >> > web.checkpoints.history: 250 > >> >> > > >> >> > >> >