
Thank you for your thoughtful points.
> I want to discuss more on points (1) and (2)
> If we take care of them  rest will be good
> Coming to (1)
> Please try to give reasonable checkpoint interval time for every job.
> Minum checkpoint interval recommended by flink community is 3 minutes
> I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 3 minutes. I frequently test with intervals of 5 minutes and of 30 minutes. I also test with checkpoint intervals such as 60 minutes, and never (manual only). In terms of which exceptions get thrown, I don't see much difference between 5/30/60, I don't see a lot of difference.
Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.
One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!
> Coming to (2)
> What's your input data rate?

My application involves what I will call "main" events that are enriched by "secondary" events. While the secondary events have several different input streams, data types, and join keys, I will estimate the secondary events all together. My estimate for input rate is as follows:
    50M "main" events
    50 secondary events for each main event, for a
        total of around 2.5B input events
    8 nodes
    20 hours

Combining these figures, we can estimate:

    50000000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2). Maybe your idea is that during backfill/bootstrap, I artificially throttle the inputs to my application?
100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.
The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.
Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.
Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.
Thanks again,

Jeff Henrikson

On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
