Hi Navneeth, I didn't quite understand how async io can be used here. It would be great > if you can share some info on it.
You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you. Also how are you propagating any changes to values? I need to maintain the mapping of road ID to various attributes of each road, and the mapping is updated every week. I use keys for versioning and I use Hash [3] for value in order to store a mapping. When a new mapping is prepared I'm uploading it using a fresh key while the previous version is being served to Flink (via async io). Such concurrent read/write is possible in Redis when you turn off transaction when creating Redis client's pipeline. When the new mapping is completely uploaded, I inform my Flink pipeline of the new mapping via Kafka. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html [2] https://www.youtube.com/watch?v=UParyxe-2Wc [3] https://redis.io/topics/data-types#hashes [4] https://github.com/andymccurdy/redis-py#pipelines Best, Dongwon On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Thanks Dongwon. It was extremely helpful. I didn't quite understand how > async io can be used here. It would be great if you can share some info on > it. > > Also how are you propagating any changes to values? > > Regards, > Navneeth > > On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Oops, I forgot to mention that when doing bulk insert into Redis, you'd >> better open a pipeline with a 'transaction' property set to False [1]. >> >> Otherwise, API calls from your Flink job will be timeout. >> >> [1] https://github.com/andymccurdy/redis-py#pipelines >> >> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Navneeth, >>> >>> I reported a similar issue to yours before [1] but I took the >>> broadcasting approach at first. >>> >>> As you already anticipated, broadcasting is going to use more memory >>> than your current approach based on a static object on each TM . >>> >>> And the broadcasted data will be treated as operator state and will be >>> periodically checkpointed with serialization overhead & garbage collections. >>> These are not negligible at all if you're not carefully choosing >>> serialization strategy as explained in [2]. >>> Even with the proper one, I've experienced mild back pressure whenever >>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing >>> to do with operator states) >>> - cache is being broadcasted >>> >>> For that reason, I decided to populate data on Redis but it also calls >>> for design decisions: >>> - which Java client to use? Jedis [3]? Lettuce [4]? >>> - how to invoke APIs calls inside Flink? synchronously or asynchronously? >>> >>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with >>> very small memory footprint and without worrying about serialization >>> overhead and garbage collections. >>> Lettuce supports asynchronous communication so it works perfectly with >>> Flink's async io. >>> I bet you'll be very disappointed with invoking Jedis synchronously >>> inside ProcessFunction. >>> >>> Best, >>> >>> Dongwon >>> >>> [1] >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html >>> [2] >>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >>> [3] https://github.com/redis/jedis >>> [4] https://lettuce.io/ >>> [5] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html >>> >>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < >>> reachnavnee...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> We have a flink streaming job processing around 200k events per second. >>>> The job requires a lot of less frequently changing data (sort of static but >>>> there will be some changes over time, say 5% change once per day or so). >>>> There are about 12 caches with some containing approximately 20k >>>> entries whereas a few with about 2 million entries. >>>> >>>> In the current implementation we are using in-memory lazy loading >>>> static cache to populate the data and the initialization happens in open >>>> function. The reason to choose this approach is because we have allocated >>>> around 4GB extra memory per TM for these caches and if a TM has 6 slots the >>>> cache can be shared. >>>> >>>> Now the issue we have with this approach is everytime when a container >>>> is restarted or a new job is deployed it has to populate the cache again. >>>> Sometimes this lazy loading takes a while and it causes back pressure as >>>> well. We were thinking to move this logic to the broadcast stream but since >>>> the data has to be stored per slot it would increase the memory consumption >>>> by a lot. >>>> >>>> Another option that we were thinking of is to replace the current near >>>> far cache that uses rest api to load the data to redis based near far >>>> cache. This will definitely reduce the overall loading time but still not >>>> the perfect solution. >>>> >>>> Are there any recommendations on how this can be achieved effectively? >>>> Also how is everyone overcoming this problem? >>>> >>>> Thanks, >>>> Navneeth >>>> >>>>