Hi, Thanks for the reply, First option working for me by creating a cache instance with expiry policy just before datastreamer. My curiosity with datastreamer and receiver also.
no build error with new changes, but application not working as expected. added breakpoint in MyStreamReceiver but not reached using (var cacheDataStreamer = DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string, T>(cacheName)) { cacheDataStreamer.AllowOverwrite = true; cacheDataStreamer.Receiver = new MyStreamReceiver<T>(); foreach (var item in data) { string cacheKey = item.Key; int index = cacheKey.IndexOf("Model:"); if (index > 0) cacheKey = cacheKey.Insert(index + "Model:".Length, CacheKeyDefault); else cacheKey = CacheKeyDefault + cacheKey; cacheDataStreamer.AddData(cacheName + ":" + cacheKey, item.Value); } cacheDataStreamer.Flush(); } public class MyStreamReceiver<T> : IStreamReceiver<string, T> { public void Receive(ICache<string, T> cache, ICollection<ICacheEntry<string, T>> entries) { foreach (var entry in entries) { cache.WithExpiryPolicy(new ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, entry.Value); } } } Regards, Charlin On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org> wrote: > 1. You can set expiry policy in CacheConfiguration so that entries > inserted with DataStreamer are also affected, > see > https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy > > 2. Compiler error says it all. Generic arguments don't match. > Try changing > MyStreamReceiver : IStreamReceiver<string, object> > to > MyStreamReceiver<T> : IStreamReceiver<string, T> > > On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com> wrote: > >> We have a requirement to set data to expire after some time. >> I set the WithExpiryPolicy for cache instance, but the data added by >> GetDataStreamer does not expire, due to it returning a new instance with >> default policies. >> So I am trying to use IStreamReceiver but not able to build the solution. >> >> IStreamReceiver Code: >> public class MyStreamReceiver : IStreamReceiver<string, object> >> { >> public void Receive(ICache<string, object> cache, >> ICollection<ICacheEntry<string, object>> entries) >> { >> foreach (var entry in entries) >> { >> cache.WithExpiryPolicy(new >> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, >> entry.Value); >> } >> } >> } >> >> Datastreamer code error >> [image: image.png] >> >> How to implement IStreamReceiver. Please help me on this. >> Regards, >> Charlin >> >> >> >>