Jeff
Glad to know that you are able to progress well and issue got resolved

Regards
Bhaskar

On Tue, Jun 23, 2020 at 12:24 AM Jeff Henrikson <jehenri...@gmail.com>
wrote:

> Bhaskar,
>
> I think I am unstuck.  The performance numbers I sent after throttling
> were due to a one character error in business logic.  I think I now have
> something good enough to work with for now.  I will repost if I
> encounter further unexpected issues.
>
> Adding application-level throttling ends up resolving both my symptom of
> slow/failing checkpoints, and also my symptom of crashes during long runs.
>
> Many thanks!
>
>
> Jeff
>
>
> On 6/20/20 11:46 AM, Jeff Henrikson wrote:
> > Bhaskar,
> >
> >  > Glad to know some progress.
> >
> > Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
> >
> > The throttling required to not crash during snapshots seems to be quite
> > different from the throttling required to crash not during snapshots. So
> > the lowest common denominator is quite a large performance penalty.
> >
> > What's worse, the rate of input that makes the snapshot performance go
> > from good to bad seems to change significantly as the state size grows.
> > Here is checkpoint history from an overnight run.
> >
> > Parameters:
> >
> >      - 30 minutes minimum between snapshots
> >      - incremental snapshot mode
> >      - inputs throttled to 100 events per sec per input per slot,
> >        which is around 1/4 of the unthrottled throughput
> >
> > Checkpoint history:
> >
> >      ID    Status    Acknowledged    Trigger Time    Latest
> > Acknowledgement    End to End Duration    State Size    Buffered During
> > Alignment
> >      12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
> > 60.5 GB    0 B
> >      11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
> > 53.3 GB    0 B
> >      10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
> > 41.0 GB    0 B
> >      9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1
> > GB    0 B
> >      8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8
> > GB    0 B
> >      7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1
> > GB    0 B
> >      6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4
> > GB    0 B
> >      5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3
> > GB    0 B
> >      4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7
> > GB    0 B
> >      3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8
> > GB    0 B
> >      2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40
> > GB    0 B
> >
> > As you can see, GB/minute varies drastically.  GB/minute also varies
> > drastically with full checkpoint mode.
> >
> > I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the
> > checkpoint GB/minute getting so slow, it will crash soon.
> >
> > I'm really wishing state.backend.async=false worked for
> > RocksDbStateBackend.
> >
> > I'm also wondering if my throttler would improve if I just connected to
> > the REST api to ask if any checkpoint is in progress, and then paused
> > inputs accordingly.  Effectively state.backend.async=false via hacked
> > application code.
> >
> >  > Where are you updating your state here? I
> >  > couldn't find any flink managed state here.
> >
> > The only updates to state I make are through the built-in
> > DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I
> > use .cogroup shows exactly two ways that .cogroup calls an
> > implementation of AppendingState.add.  I summarize those below.
> >
> > The two AppendingState subclasses invoked are HeapListState and
> > HeapReducingState.  Neither have a support attribute on them, such as
> > MapState's @PublicEvolving.
> >
> >  > I suggested updating the flink managed state using onTimer over an
> >  > interval equal to the checkpoint interval.
> >
> > So the onTimer method, with interval set to the checkpoint interval.
> > Interesting.
> >
> > It looks like the closest subclass for my use case use would be either
> > KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
> >
> > 1) between checkpoints, read join input and write join output, by
> > loading any state reads from external state, but buffering all state
> > changes in memory in some kind of data structure.
> >
> > 2) whenever a checkpoint arrived or the memory consumed by buffered
> > writes gets too big, flush the writes to state.
> >
> > Is that the gist of the idea about .onTimer?
> >
> >
> > Jeff
> >
> >
> >
> > There are two paths from .coGroup to AppendingState.add
> >
> >      path 1 of 2: .coGroup to HeapListState
> >
> >          add:90, HeapListState {org.apache.flink.runtime.state.heap}
> >          processElement:203, EvictingWindowOperator
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >          processElement:164, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >          processInput:143, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >
> >
> >
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement
>
> >
> >
> >                (windowAssigner is an instance of GlobalWindows)
> >
> >                  @Override
> >                  public void processElement(StreamRecord<IN> element)
> > throws Exception {
> >                      final Collection<W> elementWindows =
> > windowAssigner.assignWindows(
> >                              element.getValue(), element.getTimestamp(),
> > windowAssignerContext);
> >
> >                      //if element is handled by none of assigned
> > elementWindows
> >                      boolean isSkippedElement = true;
> >
> >                      final K key =
> > this.<K>getKeyedStateBackend().getCurrentKey();
> >
> >                      if (windowAssigner instanceof
> MergingWindowAssigner) {
> >                  . . .
> >                      } else {
> >                          for (W window : elementWindows) {
> >
> >                              // check if the window is already inactive
> >                              if (isWindowLate(window)) {
> >                                  continue;
> >                              }
> >                              isSkippedElement = false;
> >
> >
> > evictingWindowState.setCurrentNamespace(window);
> >                              evictingWindowState.add(element);
> >
> >          =>
> >
> >              org.apache.flink.runtime.state.heap.HeapListState#add:
> >                      @Override
> >                      public void add(V value) {
> >                          Preconditions.checkNotNull(value, "You cannot
> > add null to a ListState.");
> >
> >                          final N namespace = currentNamespace;
> >
> >                          final StateTable<K, N, List<V>> map =
> stateTable;
> >                          List<V> list = map.get(namespace);
> >
> >                          if (list == null) {
> >                              list = new ArrayList<>();
> >                              map.put(namespace, list);
> >                          }
> >                          list.add(value);
> >                      }
> >
> >      path 2 of 2: .coGroup to HeapReducingState
> >
> >              add:95, HeapReducingState
> > {org.apache.flink.runtime.state.heap}
> >              onElement:49, CountTrigger
> > {org.apache.flink.streaming.api.windowing.triggers}
> >              onElement:898, WindowOperator$Context
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >              processElement:210, EvictingWindowOperator
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >              processElement:164, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >              processInput:143, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >
> >              @Override
> >              public void processElement(StreamRecord<IN> element) throws
> > Exception {
> >                  final Collection<W> elementWindows =
> > windowAssigner.assignWindows(
> >                          element.getValue(), element.getTimestamp(),
> > windowAssignerContext);
> >
> >                  //if element is handled by none of assigned
> elementWindows
> >                  boolean isSkippedElement = true;
> >
> >                  final K key =
> > this.<K>getKeyedStateBackend().getCurrentKey();
> >
> >                  if (windowAssigner instanceof MergingWindowAssigner) {
> >              . . .
> >                  } else {
> >                      for (W window : elementWindows) {
> >
> >                          // check if the window is already inactive
> >                          if (isWindowLate(window)) {
> >                              continue;
> >                          }
> >                          isSkippedElement = false;
> >
> >                          evictingWindowState.setCurrentNamespace(window);
> >                          evictingWindowState.add(element);
> >
> >                          triggerContext.key = key;
> >                          triggerContext.window = window;
> >                          evictorContext.key = key;
> >                          evictorContext.window = window;
> >
> >                          TriggerResult triggerResult =
> > triggerContext.onElement(element);
> >
> >          =>
> >                  public TriggerResult onElement(StreamRecord<IN>
> > element) throws Exception {
> >                      return trigger.onElement(element.getValue(),
> > element.getTimestamp(), window, this);
> >
> >          =>
> >
> >              @Override
> >              public TriggerResult onElement(Object element, long
> > timestamp, W window, TriggerContext ctx) throws Exception {
> >                  ReducingState<Long> count =
> > ctx.getPartitionedState(stateDesc);
> >                  count.add(1L);
> >
> >          =>
> >
> >              org.apache.flink.runtime.state.heap.HeapReducingState#add
> >                    @Override
> >                    public void add(V value) throws IOException {
> >
> >                        if (value == null) {
> >
> >
> >
> > On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
> >> Glad to know some progress. Where are you updating your state here? I
> >> couldn't find any flink managed state here.
> >> I suggested updating the flink managed state using onTimer over an
> >> interval equal to the checkpoint interval.
> >>
> >> In your case since you do throttling, it helped to maintain the fixed
> >> rate per slot. Before the rate was sporadic.
> >> It's definitely an IO bottleneck.
> >>
> >> So now you can think of decoupling stateless scanning and stateful
> joins.
> >> For example you can keep a stateless scan as separate flink job and
> >> keep its output in some Kafka kind of store.
> >>
> >>  From there you start your stateful joins. This would help focussing
> >> on your stateful job in much better fashion
> >>
> >> Regards
> >> Bhaskar
> >>
> >>
> >>
> >>
> >> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenri...@gmail.com
> >> <mailto:jehenri...@gmail.com>> wrote:
> >>
> >>     Bhaskar,
> >>
> >>     Based on your idea of limiting input to get better checkpoint
> >> behavior,
> >>     I made a ProcessFunction that constraints to a number of events per
> >>     second per slot per input.  I do need to do some stateless input
> >>     scanning before joins.  The stateless part needs to be fast and
> >> does no
> >>     impact snapshots.  So I inserted the throttling after the input
> >>     preprocessing but before the stateful transformations.  There is a
> >>     significant difference of snapshot throughput (often 5x or larger)
> >> when
> >>     I change the throttle between 200 and 300 events per second (per
> slot
> >>     per input).
> >>
> >>     Hope the throttling keeps being effective as I keep the job running
> >>     longer.
> >>
> >>     Odd.  But likely a very effective way out of my problem.
> >>
> >>     I wonder what drives it . . .  Thread contention?  IOPS contention?
> >>
> >>     See ProcessFunction code below.
> >>
> >>     Many thanks!
> >>
> >>
> >>     Jeff
> >>
> >>
> >>
> >>     import org.apache.flink.streaming.api.functions.ProcessFunction
> >>     import org.apache.flink.util.Collector
> >>
> >>     // Set eventsPerSecMax to -1 to disable the throttle
> >>     // TODO: Actual number of events can be slightly larger
> >>     // TODO: Remove pause correlation with system clock
> >>
> >>     case class Throttler[T](eventsPerSecMax : Double) extends
> >>     ProcessFunction[T,T] {
> >>         var minutePrev = 0
> >>         var numEvents = 0
> >>         def minutes() = {
> >>           val ms = System.currentTimeMillis()
> >>           (ms / 1000 / 60).toInt
> >>         }
> >>         def increment() = {
> >>           val m = minutes()
> >>           if(m != minutePrev) {
> >>             numEvents = 0
> >>           }
> >>           numEvents += 1
> >>         }
> >>         def eps() = {
> >>           numEvents/60.0
> >>         }
> >>         override def processElement(x: T, ctx: ProcessFunction[T,
> >>     T]#Context,
> >>     out: Collector[T]): Unit = {
> >>           increment()
> >>           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
> >>             Thread.sleep(1000L)
> >>           }
> >>           out.collect(x)
> >>         }
> >>     }
> >>
> >>     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> >>      > Bhaskar,
> >>      >
> >>      > Thank you for your thoughtful points.
> >>      >
> >>      >  > I want to discuss more on points (1) and (2)
> >>      >  > If we take care of them  rest will be good
> >>      >  >
> >>      >  > Coming to (1)
> >>      >  >
> >>      >  > Please try to give reasonable checkpoint interval time for
> >>     every job.
> >>      >  > Minum checkpoint interval recommended by flink community is 3
> >>     minutes
> >>      >  > I thin you should give minimum 3 minutes checkpoint interval
> >>     for all
> >>      >
> >>      > I have spent very little time testing with checkpoint intervals
> >>     of under
> >>      > 3 minutes.  I frequently test with intervals of 5 minutes and
> >> of 30
> >>      > minutes.  I also test with checkpoint intervals such as 60
> >>     minutes, and
> >>      > never (manual only).  In terms of which exceptions get thrown, I
> >>     don't
> >>      > see much difference between 5/30/60, I don't see a lot of
> >> difference.
> >>      >
> >>      > Infinity (no checkpoint internal) seems to be an interesting
> >> value,
> >>      > because before crashing, it seems to process around twice as much
> >>     state
> >>      > as with any finite checkpoint interval.  The largest savepoints I
> >>     have
> >>      > captured have been manually triggered using the /job/:jobid/stop
> >>     REST
> >>      > API.  I think it helps for the snapshot to be synchronous.
> >>      >
> >>      > One curiosity about the /job/:jobid/stop command is that from
> >>     time of
> >>      > the command, it often takes many minutes for the internal
> >>     processing to
> >>      > stop.
> >>      >
> >>      > Another curiosity about /job/:jobid/stop command is that
> sometimes
> >>      > following a completed savepoint, the cluster goes back to
> running!
> >>      >
> >>      >  > Coming to (2)
> >>      >  >
> >>      >  > What's your input data rate?
> >>      >
> >>      > My application involves what I will call "main" events that are
> >>     enriched
> >>      > by "secondary" events.  While the secondary events have several
> >>      > different input streams, data types, and join keys, I will
> >>     estimate the
> >>      > secondary events all together.  My estimate for input rate is as
> >>     follows:
> >>      >
> >>      >      50M "main" events
> >>      >      50 secondary events for each main event, for a
> >>      >          total of around 2.5B input events
> >>      >      8 nodes
> >>      >      20 hours
> >>      >
> >>      > Combining these figures, we can estimate:
> >>      >
> >>      >      50000000*50/8/20/3600 = 4340 events/second/node
> >>      >
> >>      > I don't see how to act on your advice for (2).  Maybe your idea
> >>     is that
> >>      > during backfill/bootstrap, I artificially throttle the inputs
> >> to my
> >>      > application?
> >>      >
> >>      > 100% of my application state is due to .cogroup, which manages a
> >>      > HeapListState on its own.  I cannot think of any controls for
> >>     changing
> >>      > how .cogroup handles internal state per se.  I will paste below
> >> the
> >>      > Flink code path that .cogroup uses to update its internal state
> >>     when it
> >>      > runs my application.
> >>      >
> >>      > The only control I can think of with .cogroup that indirectly
> >>     impacts
> >>      > internal state is delayed triggering.
> >>      >
> >>      > Currently I use a trigger on every event, which I understand
> >>     creates a
> >>      > suboptimal number of events.  I previously experimented with
> >> delayed
> >>      > triggering, but I did not get good results.
> >>      >
> >>      > Just now I tried again ContinuousProcessingTimeTrigger of 30
> >>     seconds,
> >>      > with rocksdb.timer-service.factory: heap, and a 5 minute
> >> checkpoint
> >>      > interval.  The first checkpoint failed, which has been rare when
> >>     I use
> >>      > all the same parameters except for triggering on every event.
> >> So it
> >>      > looks worse not better.
> >>      >
> >>      > Thanks again,
> >>      >
> >>      >
> >>      > Jeff Henrikson
> >>      >
> >>      >
> >>      >
> >>      >
> >>      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
> >>      >> Thanks for the reply.
> >>      >> I want to discuss more on points (1) and (2)
> >>      >> If we take care of them  rest will be good
> >>      >>
> >>      >> Coming to (1)
> >>      >>
> >>      >> Please try to give reasonable checkpoint interval time for every
> >>     job.
> >>      >> Minum checkpoint interval recommended by flink community is 3
> >>     minutes
> >>      >> I thin you should give minimum 3 minutes checkpoint interval
> >> for all
> >>      >>
> >>      >> Coming to (2)
> >>      >>
> >>      >> What's your input data rate?
> >>      >> For example you are seeing data at 100 msg/sec, For each
> >> message if
> >>      >> there is state changing and you are updating the state with
> >>     RocksDB,
> >>      >> it's going to
> >>      >> create 100 rows in 1 second at RocksDb end, On the average if 50
> >>      >> records have changed each second, even if you are using RocksDB
> >>      >> differentialstate = true,
> >>      >> there is no use. Because everytime 50% is new rows getting
> >>     added. So
> >>      >> the best bet is to update records with RocksDB only once in your
> >>      >> checkpoint interval.
> >>      >> Suppose your checkpoint interval is 5 minutes. If you update
> >>     RocksDB
> >>      >> state once in 5 minutes, then the rate at which new records
> >>     added to
> >>      >> RocksDB  will be 1 record/5min.
> >>      >> Whereas in your original scenario, 30000 records added to
> >>     rocksDB in 5
> >>      >> min. You can save 1:30000 ratio of records in addition to
> >> RocksDB.
> >>      >> Which will save a huge
> >>      >> redundant size addition to RocksDB. Ultimately your  state is
> >>     driven
> >>      >> by your checkpoint interval. From the input source you will go
> >>     back 5
> >>      >> min back and read the state, similarly from RocksDB side
> >>      >> also you can have a state update once in 5 min should work.
> >>     Otherwise
> >>      >> even if you add state there is no use.
> >>      >>
> >>      >> Regards
> >>      >> Bhaskar
> >>      >>
> >>      >> Try to update your RocksDB state in an interval equal to the
> >>      >> checkpoint interval. Otherwise in my case many times what's
> >>     observed is
> >>      >> state size grows unnecessarily.
> >>      >>
> >>      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
> >>     <jehenri...@gmail.com <mailto:jehenri...@gmail.com>
> >>      >> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>>
> >> wrote:
> >>      >>
> >>      >>     Vijay,
> >>      >>
> >>      >>     Thanks for your thoughts.  Below are answers to your
> >> questions.
> >>      >>
> >>      >>       > 1. What's your checkpoint interval?
> >>      >>
> >>      >>     I have used many different checkpoint intervals, ranging
> >> from 5
> >>      >> minutes
> >>      >>     to never.  I usually setMinPasueBetweenCheckpoints to the
> >> same
> >>      >> value as
> >>      >>     the checkpoint interval.
> >>      >>
> >>      >>       > 2. How frequently are you updating the state into
> >> RocksDB?
> >>      >>
> >>      >>     My understanding is that for .cogroup:
> >>      >>
> >>      >>         - Triggers control communication outside the operator
> >>      >>         - Evictors control cleanup of internal state
> >>      >>         - Configurations like write buffer size control the
> >>     frequency of
> >>      >>     state change at the storage layer
> >>      >>         - There is no control for how frequently the window
> state
> >>      >>     updates at
> >>      >>     the layer of the RocksDB api layer.
> >>      >>
> >>      >>     Thus, the state update whenever data is ingested.
> >>      >>
> >>      >>       > 3. How many task managers are you using?
> >>      >>
> >>      >>     Usually I have been running with one slot per taskmanager.
> >>     28GB of
> >>      >>     usable ram on each node.
> >>      >>
> >>      >>       > 4. How much data each task manager handles while
> >> taking the
> >>      >>     checkpoint?
> >>      >>
> >>      >>     Funny you should ask.  I would be okay with zero.
> >>      >>
> >>      >>     The application I am replacing has a latency of 36-48 hours,
> >>     so if I
> >>      >>     had
> >>      >>     to fully stop processing to take every snapshot
> >>     synchronously, it
> >>      >> might
> >>      >>     be seen as totally acceptable, especially for initial
> >>     bootstrap.
> >>      >> Also,
> >>      >>     the velocity of running this backfill is approximately
> >> 115x real
> >>      >>     time on
> >>      >>     8 nodes, so the steady-state run may not exhibit the failure
> >>     mode in
> >>      >>     question at all.
> >>      >>
> >>      >>     It has come as some frustration to me that, in the case of
> >>      >>     RocksDBStateBackend, the configuration key
> >> state.backend.async
> >>      >>     effectively has no meaningful way to be false.
> >>      >>
> >>      >>     The only way I have found in the existing code to get a
> >>     behavior like
> >>      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop with
> >>      >> drain=false
> >>      >>     and a URL.  This method of failing fast is the way that I
> >>     discovered
> >>      >>     that I needed to increase transfer threads from the default.
> >>      >>
> >>      >>     The reason I don't just run the whole backfill and then
> >> take one
> >>      >>     snapshot is that even in the absence of checkpoints, a very
> >>     similar
> >>      >>     congestion seems to take the cluster down when I am say
> >>     20-30% of the
> >>      >>     way through my backfill.
> >>      >>
> >>      >>     Reloading from my largest feasible snapshot makes it
> >>     possible to make
> >>      >>     another snapshot a bit larger before crash, but not by much.
> >>      >>
> >>      >>     On first glance, the code change to allow
> >>     RocksDBStateBackend into a
> >>      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
> >>     I was
> >>      >>     hoping to do the initial launch of my application without
> >>     needing to
> >>      >>     modify the framework.
> >>      >>
> >>      >>     Regards,
> >>      >>
> >>      >>
> >>      >>     Jeff Henrikson
> >>      >>
> >>      >>
> >>      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> >>      >>      > For me this seems to be an IO bottleneck at your task
> >>     manager.
> >>      >>      > I have a couple of queries:
> >>      >>      > 1. What's your checkpoint interval?
> >>      >>      > 2. How frequently are you updating the state into
> RocksDB?
> >>      >>      > 3. How many task managers are you using?
> >>      >>      > 4. How much data each task manager handles while taking
> >> the
> >>      >>     checkpoint?
> >>      >>      >
> >>      >>      > For points (3) and (4) , you should be very careful. I
> >>     feel you
> >>      >> are
> >>      >>      > stuck at this.
> >>      >>      > You try to scale vertically by increasing more CPU and
> >>     memory for
> >>      >>     each
> >>      >>      > task manager.
> >>      >>      > If not, try to scale horizontally so that each task
> >>     manager IO
> >>      >>     gets reduces
> >>      >>      > Apart from that check is there any bottleneck with the
> >> file
> >>      >> system.
> >>      >>      >
> >>      >>      > Regards
> >>      >>      > Bhaskar
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
> >>     <vict...@gmail.com <mailto:vict...@gmail.com>
> >>      >>     <mailto:vict...@gmail.com <mailto:vict...@gmail.com>>
> >>      >>      > <mailto:vict...@gmail.com <mailto:vict...@gmail.com>
> >>     <mailto:vict...@gmail.com <mailto:vict...@gmail.com>>>> wrote:
> >>      >>      >
> >>      >>      >     I had a similar problem.   I ended up solving by not
> >>      >> relying on
> >>      >>      >     checkpoints for recovery and instead re-read my input
> >>     sources
> >>      >>     (in my
> >>      >>      >     case a kafka topic) from the earliest offset and
> >>     rebuilding
> >>      >>     only the
> >>      >>      >     state I need.  I only need to care about the past 1
> >> to 2
> >>      >> days of
> >>      >>      >     state so can afford to drop anything older.   My
> >> recovery
> >>      >>     time went
> >>      >>      >     from over an hour for just the first checkpoint to
> >>     under 10
> >>      >>     minutes.
> >>      >>      >
> >>      >>      >     Tim
> >>      >>      >
> >>      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
> >>     <myas...@live.com <mailto:myas...@live.com>
> >>      >>     <mailto:myas...@live.com <mailto:myas...@live.com>>
> >>      >>      >     <mailto:myas...@live.com <mailto:myas...@live.com>
> >>     <mailto:myas...@live.com <mailto:myas...@live.com>>>> wrote:
> >>      >>      >
> >>      >>      >         Hi Jeff
> >>      >>      >
> >>      >>      >          1. "after around 50GB of state, I stop being
> >> able to
> >>      >>     reliably
> >>      >>      >             take checkpoints or savepoints. "
> >>      >>      >             What is the exact reason that job cannot
> >> complete
> >>      >>      >             checkpoint? Expired before completing or
> >>     decline by
> >>      >> some
> >>      >>      >             tasks? The former one is manly caused by high
> >>      >>     back-pressure
> >>      >>      >             and the later one is mainly due to some
> >> internal
> >>      >> error.
> >>      >>      >          2. Have you checked what reason the remote task
> >>     manager
> >>      >>     is lost?
> >>      >>      >             If the remote task manager is not crashed, it
> >>     might
> >>      >>     be due
> >>      >>      >             to GC impact, I think you might need to check
> >>      >>     task-manager
> >>      >>      >             logs and GC logs.
> >>      >>      >
> >>      >>      >         Best
> >>      >>      >         Yun Tang
> >>      >>      >
> >>      >>
> >>
> >>
>  ------------------------------------------------------------------------
> >>      >>      >         *From:* Jeff Henrikson <jehenri...@gmail.com
> >>     <mailto:jehenri...@gmail.com>
> >>      >>     <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>
> >>      >>      >         <mailto:jehenri...@gmail.com
> >>     <mailto:jehenri...@gmail.com>
> >>      >> <mailto:jehenri...@gmail.com <mailto:jehenri...@gmail.com>>>>
> >>      >>      >         *Sent:* Thursday, June 18, 2020 1:46
> >>      >>      >         *To:* user <user@flink.apache.org
> >>     <mailto:user@flink.apache.org>
> >>      >>     <mailto:user@flink.apache.org
> >>     <mailto:user@flink.apache.org>> <mailto:user@flink.apache.org
> >>     <mailto:user@flink.apache.org>
> >>      >>     <mailto:user@flink.apache.org
> >> <mailto:user@flink.apache.org>>>>
> >>      >>      >         *Subject:* Trouble with large state
> >>      >>      >         Hello Flink users,
> >>      >>      >
> >>      >>      >         I have an application of around 10 enrichment
> >>     joins.  All
> >>      >>     events
> >>      >>      >         are
> >>      >>      >         read from kafka and have event timestamps.  The
> >>     joins are
> >>      >>     built
> >>      >>      >         using
> >>      >>      >         .cogroup, with a global window, triggering on
> >> every 1
> >>      >>     event, plus a
> >>      >>      >         custom evictor that drops records once a newer
> >>     record
> >>      >> for the
> >>      >>      >         same ID
> >>      >>      >         has been processed.  Deletes are represented by
> >> empty
> >>      >>     events with
> >>      >>      >         timestamp and ID (tombstones). That way, we can
> >> drop
> >>      >>     records when
> >>      >>      >         business logic dictates, as opposed to when a
> >> maximum
> >>      >>     retention
> >>      >>      >         has been
> >>      >>      >         attained.  The application runs
> >>     RocksDBStateBackend, on
> >>      >>      >         Kubernetes on
> >>      >>      >         AWS with local SSDs.
> >>      >>      >
> >>      >>      >         Unit tests show that the joins produce expected
> >>      >> results.     On an
> >>      >>      >         8 node
> >>      >>      >         cluster, watermark output progress seems to
> >>     indicate I
> >>      >>     should be
> >>      >>      >         able to
> >>      >>      >         bootstrap my state of around 500GB in around 1
> >>     day.  I am
> >>      >>     able
> >>      >>      >         to save
> >>      >>      >         and restore savepoints for the first half an hour
> >>     of run
> >>      >>     time.
> >>      >>      >
> >>      >>      >         My current trouble is that after around 50GB of
> >>     state,
> >>      >> I stop
> >>      >>      >         being able
> >>      >>      >         to reliably take checkpoints or savepoints.  Some
> >>     time
> >>      >> after
> >>      >>      >         that, I
> >>      >>      >         start getting a variety of failures where the
> >> first
> >>      >>     suspicious
> >>      >>      >         log event
> >>      >>      >         is a generic cluster connectivity error, such as:
> >>      >>      >
> >>      >>      >               1) java.io.IOException: Connecting the
> >> channel
> >>      >> failed:
> >>      >>      >         Connecting
> >>      >>      >               to remote task manager +
> >>     '/10.67.7.101:38955 <http://10.67.7.101:38955>
> >>      >>     <http://10.67.7.101:38955>
> >>      >>      >         <http://10.67.7.101:38955>' has failed. This
> >>      >>      >               might indicate that the remote task
> >> manager has
> >>      >>     been lost.
> >>      >>      >
> >>      >>      >               2) org.apache.flink.runtime.io
> >>     <http://org.apache.flink.runtime.io>
> >>      >>     <http://org.apache.flink.runtime.io
> >.network.netty.exception
> >>      >>      >               .RemoteTransportException: Connection
> >>     unexpectedly
> >>      >>     closed
> >>      >>      >         by remote
> >>      >>      >               task manager 'null'. This might indicate
> >>     that the
> >>      >>     remote task
> >>      >>      >               manager was lost.
> >>      >>      >
> >>      >>      >               3) Association with remote system
> >>      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >>     <http://flink@10.67.6.66:34987>
> >>      >>      >         <http://flink@10.67.6.66:34987>] has failed,
> >>     address is
> >>      >> now
> >>      >>      >               gated for [50] ms. Reason: [Association
> >>     failed with
> >>      >>      >               [akka.tcp://flink@10.67.6.66:34987
> >>     <http://flink@10.67.6.66:34987>
> >>      >>     <http://flink@10.67.6.66:34987>
> >>      >>      >         <http://flink@10.67.6.66:34987>]] Caused by:
> >>      >>      >               [java.net <http://java.net>
> >>     <http://java.net>.NoRouteToHostException:
> >>      >>     No route to host]
> >>      >>      >
> >>      >>      >         I don't see any obvious out of memory errors on
> >> the
> >>      >>     TaskManager UI.
> >>      >>      >
> >>      >>      >         Adding nodes to the cluster does not seem to
> >>     increase the
> >>      >>     maximum
> >>      >>      >         savable state size.
> >>      >>      >
> >>      >>      >         I could enable HA, but for the time being I
> >> have been
> >>      >>     leaving it
> >>      >>      >         out to
> >>      >>      >         avoid the possibility of masking deterministic
> >>     faults.
> >>      >>      >
> >>      >>      >         Below are my configurations.
> >>      >>      >
> >>      >>      >         Thanks in advance for any advice.
> >>      >>      >
> >>      >>      >         Regards,
> >>      >>      >
> >>      >>      >
> >>      >>      >         Jeff Henrikson
> >>      >>      >
> >>      >>      >
> >>      >>      >
> >>      >>      >         Flink version: 1.10
> >>      >>      >
> >>      >>      >         Configuration set via code:
> >>      >>      >               parallelism=8
> >>      >>      >               maxParallelism=64
> >>      >>      >
> >> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>      >>      >
> >>      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >>      >>      >               setTolerableCheckpointFailureNumber(1000)
> >>      >>      >               setMaxConcurrentCheckpoints(1)
> >>      >>      >
> >>      >>      >
> >>      >>
> >>
> >>
>  
> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
> >>
> >>      >>
> >>      >>      >               RocksDBStateBackend
> >>      >>      >
> >> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> >>      >>      >               setNumberOfTransferThreads(25)
> >>      >>      >               setDbStoragePath points to a local nvme SSD
> >>      >>      >
> >>      >>      >         Configuration in flink-conf.yaml:
> >>      >>      >
> >>      >>      >               jobmanager.rpc.address: localhost
> >>      >>      >               jobmanager.rpc.port: 6123
> >>      >>      >               jobmanager.heap.size: 28000m
> >>      >>      >               taskmanager.memory.process.size: 28000m
> >>      >>      >               taskmanager.memory.jvm-metaspace.size: 512m
> >>      >>      >               taskmanager.numberOfTaskSlots: 1
> >>      >>      >               parallelism.default: 1
> >>      >>      >               jobmanager.execution.failover-strategy:
> full
> >>      >>      >
> >>      >>      >               cluster.evenly-spread-out-slots: false
> >>      >>      >
> >>      >>      >               taskmanager.memory.network.fraction:
> >>     0.2           #
> >>      >>      >         default 0.1
> >>      >>      >
> >> taskmanager.memory.framework.off-heap.size: 2GB
> >>      >>      >               taskmanager.memory.task.off-heap.size: 2GB
> >>      >>      >     taskmanager.network.memory.buffers-per-channel: 32
> >>      >>     # default 2
> >>      >>      >               taskmanager.memory.managed.fraction: 0.4
> >>     #
> >>      >> docs say
> >>      >>      >         default 0.1, but something seems to set 0.4
> >>      >>      >               taskmanager.memory.task.off-heap.size:
> >>     2048MB      #
> >>      >>      >         default 128M
> >>      >>      >
> >>      >>      >               state.backend.fs.memory-threshold: 1048576
> >>      >>      >               state.backend.fs.write-buffer-size:
> 10240000
> >>      >>      >               state.backend.local-recovery: true
> >>      >>      >               state.backend.rocksdb.writebuffer.size:
> 64MB
> >>      >>      >               state.backend.rocksdb.writebuffer.count: 8
> >>      >>      >     state.backend.rocksdb.writebuffer.number-to-merge: 4
> >>      >>      >     state.backend.rocksdb.timer-service.factory: heap
> >>      >>      >               state.backend.rocksdb.block.cache-size:
> >>     64000000 #
> >>      >>     default 8MB
> >>      >>      >               state.backend.rocksdb.write-batch-size:
> >>     16000000 #
> >>      >>     default 2MB
> >>      >>      >
> >>      >>      >               web.checkpoints.history: 250
> >>      >>      >
> >>      >>
> >>
>

Reply via email to