Hi Navneeth

You cannot easily create single specific instance per TM since Flink would not 
allow user defined object binned with the life cycle of task manager. However, 
you can ensure all the operators of the same class could share some single 
object when initializing operators. You could use static variable with atomic 
reference or synchronization when calling RichFunction#open to initialize and 
remember to release resources when calling RichFunction#close .

Best
Yun Tang
________________________________
From: Navneeth Krishnan <reachnavnee...@gmail.com>
Sent: Monday, January 13, 2020 11:22
To: Yun Tang <myas...@live.com>
Cc: user <user@flink.apache.org>
Subject: Re: Using redis cache in flink

Hi Yun,

Thanks for the update. I can definitely use a redis cluster but what I don't 
understand is if I use a custom operator then redis cache will instantiated per 
operator instance. What I would like to ideally have is one redis cache 
instance per TM JVM. Since there isn't anyway to share data between task slots 
today in flink, I would like to use this approach to basically share common 
data. What I'm not sure is how can I ensure just one cache instance per TM JVM 
is created?

Regards

On Wed, Jan 8, 2020 at 12:46 AM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to 
choose redis cluster since Flink might deploy task on another node which is 
different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized 
operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with 
checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. 
The basic idea is to cac

[1] 
https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java#L147
[2] 
https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.java#L89


________________________________
From: Navneeth Krishnan 
<reachnavnee...@gmail.com<mailto:reachnavnee...@gmail.com>>
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang <myas...@live.com<mailto:myas...@live.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Using redis cache in flink

Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I 
would still have rocksdb state backend for other states. The reason to use 
cache instead of managed state is because I’d get around 10k msgs per task slot 
and I don’t have to get the state from rocksdb for each lookup. In memory cache 
would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots 
as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you 
can share data across slots easily. However, compared with the reduced cost of 
serialization, the introduce of network communicate cannot be ignored. There 
exists trade-off here, and we cannot ensure there would be a performance gain. 
Actually, I prefer the time used in CPU serialization is much less than the 
time consumed through the network.

Best
Yun Tang
________________________________
From: Navneeth Krishnan 
<reachnavnee...@gmail.com<mailto:reachnavnee...@gmail.com>>
Sent: Wednesday, January 8, 2020 12:33
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Using redis cache in flink

Hi All,

I want to use redis as near far cache to store data which are common across 
slots i.e. share data across slots. This data is required for processing every 
single message and it's better to store in a in memory cache backed by redis 
rather than rocksdb since it has to be serialized for every single get call. Do 
you guys think this is good solution or is there any other better solution? 
Also, Is there any reference on how I can create a centralized near far cache 
since the job and operators are distributed by the job manager.

Thanks

Reply via email to