Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting
approach at first.

As you already anticipated, broadcasting is going to use more memory than
your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be
periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing
serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for
design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with
very small memory footprint and without worrying about serialization
overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with
Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside
ProcessFunction.

Best,

Dongwon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
[2]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
[3] https://github.com/redis/jedis
[4] https://lettuce.io/
[5]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> We have a flink streaming job processing around 200k events per second.
> The job requires a lot of less frequently changing data (sort of static but
> there will be some changes over time, say 5% change once per day or so).
> There are about 12 caches with some containing approximately 20k
> entries whereas a few with about 2 million entries.
>
> In the current implementation we are using in-memory lazy loading static
> cache to populate the data and the initialization happens in open function.
> The reason to choose this approach is because we have allocated around 4GB
> extra memory per TM for these caches and if a TM has 6 slots the cache can
> be shared.
>
> Now the issue we have with this approach is everytime when a container is
> restarted or a new job is deployed it has to populate the cache again.
> Sometimes this lazy loading takes a while and it causes back pressure as
> well. We were thinking to move this logic to the broadcast stream but since
> the data has to be stored per slot it would increase the memory consumption
> by a lot.
>
> Another option that we were thinking of is to replace the current near far
> cache that uses rest api to load the data to redis based near far cache.
> This will definitely reduce the overall loading time but still not the
> perfect solution.
>
> Are there any recommendations on how this can be achieved effectively?
> Also how is everyone overcoming this problem?
>
> Thanks,
> Navneeth
>
>

Reply via email to