Hi Ashish,

great to hear that things work better with the RocksDB state backend. I
would only start playing with the containerized.heap-cutoff-ratio if you
see TMs failing due to exceeding the direct memory limit. Currently, not
all of the cutoff memory is set as the direct memory limit. We have a
pending fix for that.

Apart from that, it is indeed a good idea to test your application and
monitor how it behaves when increasing the workload.

Cheers,
Till
​

On Mon, Oct 30, 2017 at 1:34 AM, ashish pok <ashish...@yahoo.com> wrote:

> Jorn, correct and I suppose that's where we are at this point. RocksDB
> based backend is definitely looking promising for our use case. Since I
> haven't gotten a definite no-no on using 30% for YARN cut-off ratio (about
> 1.8GB from 6GB memory) and off-heap flag turned on, we will continue on
> that path. Current plan is to increase throughput on input streams - state
> streams are pretty much processing already and preserved in RocksDB and we
> can control streams for joining with those states and monitor resource
> utilizations + join performance. We are seeing 200-500ms processing times
> with pretty decent amount of logging, which is pretty good for our needs.
>
> Agree about the way to estimate the size of state and hence one of the
> reasons of my original question on what others have done. Our states are
> essentially tuples (few primitive values like string, long and a Map of
> string and string, which hold about 10-12 keys, values are small - not more
> than 128 bytes tops). We created a savepoint after processing about 500k
> records and that's where my estimate came from. I'd be the first one to
> admit it is not accurate but that's the best we could think of.
>
> Thanks, Ashish
>
> ------------------------------
> *From:* Jörn Franke <jornfra...@gmail.com>
> *To:* Ashish Pokharel <ashish...@yahoo.com>
> *Cc:* Till Rohrmann <trohrm...@apache.org>; user <user@flink.apache.org>
> *Sent:* Sunday, October 29, 2017 6:05 PM
> *Subject:* Re: Capacity Planning For Large State in YARN Cluster
>
> Well you can only performance test it beforehand in different scenarios
> with different configurations.
>
> I am not sure what exactly your state holds (eg how many objects etc), but
> if it is Java objects then 3 times might be a little bit low (depends also
> how you initially tested state size) - however Flink optimizes this as
> well. Nevertheless, something like Rocksdb is probably a better solution
> for larger states.
>
> On 29. Oct 2017, at 21:15, Ashish Pokharel <ashish...@yahoo.com> wrote:
>
> Hi Till,
>
> I got the same feedback from Robert Metzger over in Stackflow. I have
> switched my app to use RocksDB and as yes, it did stabilize the app :)
>
> However, I am still struggling with how to map out my TMs and JMs memory,
> number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and
> 60 GB of total cluster memory. Idea was to make the states distributed and
> approx. 1 GB of memory per slot. I have also changed containerized.heap-
> cutoff-ratio config to 0.3 to allow for a little room for RocksDB
> (RocksDB is using basic spinning disk optimized pre-defined configs but we
> do have SSDs on our Prod machines that we can leverage in future too) and
> set taskmanager.memory.off-heap to true.It feels more experimental at
> this point than an exact science :) If there are any further guidelines on
> how we can plan for this as we open up the flood gates to stream heavy
> continuous streams, that will be great.
>
> Thanks again,
>
> Ashish
>
> On Oct 27, 2017, at 8:45 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Ashish,
>
> what you are describing should be a good use case for Flink and it should
> be able to run your program.
>
> When you are seeing a GC overhead limit exceeded error, then it means that
> Flink or your program are creating too many/too large objects filling up
> the memory in a short time. I would recommend checking your user program to
> see whether you can avoid unnecessary object instantiations and whether it
> is possible to reuse created objects.
>
> Concerning Flink's state backends, the memory state backend is currently
> not able to spill to disk. Also the managed memory is only relevant for
> DataSet/batch programs and not streaming programs. Therefore, I would
> recommend you to try out the RocksDB state backend which is able to
> gracefully spill to disk if the state size should grow too large.
> Consequently, you don't have to adjust the managed memory settings because
> they currently don't have an effect on streaming programs.
>
> My gut feeling is that switching to the RocksDBStateBackend could already
> solve your problems. If this should not be the case, then please let me
> know again.
>
> Cheers,
> Till
>
> On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel <ashish...@yahoo.com>
> wrote:
>
> Hi Everyone,
>
> We have hit a roadblock moving an app at Production scale and was hoping
> to get some guidance. Application is pretty common use case in stream
> processing but does require maintaining large number of keyed states. We
> are processing 2 streams - one of which is a daily burst of stream
> (normally around 50 mil but could go upto 100 mil in one hour burst) and
> other is constant stream of around 70-80 mil per hour. We are doing a low
> level join using CoProcess function between the two keyed streams.
> CoProcess function needs to refresh (upsert) state from the daily burst
> stream and decorate constantly streaming data with values from state built
> using bursty stream. All of the logic is working pretty well in a
> standalone Dev environment. We are throwing about 500k events of bursty
> traffic for state and about 2-3 mil of data stream. We have 1 TM with 16GB
> memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on
> the server. We have been taking savepoints in case we need to restart app
> for with code changes etc. App does seem to recover from state very well as
> well. Based on the savepoints, total volume of state in production flow
> should be around 25-30GB.
>
> At this point, however, we are trying deploy the app at production scale.
> App also has a flag that can be set at startup time to ignore data stream
> so we can simply initialize state. So basically we are trying to see if we
> can initialize the state first and take a savepoint as test. At this point
> we are using 10 TM with 4 slots and 8GB memory each (idea was to allocate
> around 3 times estimated state size to start with) but TMs keep getting
> killed by YARN with a GC Overhead Limit Exceeded error. We have gone
> through quite a few blogs/docs on Flink Management Memory, off-heap vs heap
> memory, Disk Spill over, State Backend etc. We did try to tweak
> managed-memory configs in multiple ways (off/on heap, fraction, network
> buffers etc) but can’t seem to figure out good way to fine tune the app to
> avoid issues. Ideally, we would hold state in memory (we do have enough
> capacity in Production environment for it) for performance reasons and
> spill over to disk (which I believe Flink should provide out of the box?).
> It feels like 3x anticipated state volume in cluster memory should have
> been enough to just initialize state. So instead of just continuing to
> increase memory (which may or may not help as error is regarding GC
> overhead) we wanted to get some input from experts on best practices and
> approach to plan this application better.
>
> Appreciate your input in advance!
>
>
>
>
>
>

Reply via email to