Hi Navneeth, I reported a similar issue to yours before [1] but I took the broadcasting approach at first.
As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM . And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections. These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2]. Even with the proper one, I've experienced mild back pressure whenever - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states) - cache is being broadcasted For that reason, I decided to populate data on Redis but it also calls for design decisions: - which Java client to use? Jedis [3]? Lettuce [4]? - how to invoke APIs calls inside Flink? synchronously or asynchronously? Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections. Lettuce supports asynchronous communication so it works perfectly with Flink's async io. I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction. Best, Dongwon [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html [2] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html [3] https://github.com/redis/jedis [4] https://lettuce.io/ [5] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hi All, > > We have a flink streaming job processing around 200k events per second. > The job requires a lot of less frequently changing data (sort of static but > there will be some changes over time, say 5% change once per day or so). > There are about 12 caches with some containing approximately 20k > entries whereas a few with about 2 million entries. > > In the current implementation we are using in-memory lazy loading static > cache to populate the data and the initialization happens in open function. > The reason to choose this approach is because we have allocated around 4GB > extra memory per TM for these caches and if a TM has 6 slots the cache can > be shared. > > Now the issue we have with this approach is everytime when a container is > restarted or a new job is deployed it has to populate the cache again. > Sometimes this lazy loading takes a while and it causes back pressure as > well. We were thinking to move this logic to the broadcast stream but since > the data has to be stored per slot it would increase the memory consumption > by a lot. > > Another option that we were thinking of is to replace the current near far > cache that uses rest api to load the data to redis based near far cache. > This will definitely reduce the overall loading time but still not the > perfect solution. > > Are there any recommendations on how this can be achieved effectively? > Also how is everyone overcoming this problem? > > Thanks, > Navneeth > >