If you run multiple nodes in the cluster, the receiver may be invoked on another node, so the breakpoint is not reached. I've simplified yor code a bit and it works as expected: https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
On Mon, May 30, 2022 at 11:22 AM Charlin S <charli...@hotelhub.com> wrote: > Hi, > Thanks for the reply, > First option working for me by creating a cache instance with expiry > policy just before datastreamer. > My curiosity with datastreamer and receiver also. > > no build error with new changes, but application not working as expected. > added breakpoint in MyStreamReceiver but not reached > > using (var cacheDataStreamer = > DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string, > T>(cacheName)) > { > cacheDataStreamer.AllowOverwrite = true; > cacheDataStreamer.Receiver = new > MyStreamReceiver<T>(); > foreach (var item in data) > { > string cacheKey = item.Key; > int index = cacheKey.IndexOf("Model:"); > if (index > 0) > cacheKey = cacheKey.Insert(index + > "Model:".Length, CacheKeyDefault); > else > cacheKey = CacheKeyDefault + cacheKey; > cacheDataStreamer.AddData(cacheName + ":" + > cacheKey, item.Value); > > > } > cacheDataStreamer.Flush(); > } > > public class MyStreamReceiver<T> : IStreamReceiver<string, T> > { > public void Receive(ICache<string, T> cache, > ICollection<ICacheEntry<string, T>> entries) > { > foreach (var entry in entries) > { > cache.WithExpiryPolicy(new > ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, > entry.Value); > } > } > } > > Regards, > Charlin > > > On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org> wrote: > >> 1. You can set expiry policy in CacheConfiguration so that entries >> inserted with DataStreamer are also affected, >> see >> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy >> >> 2. Compiler error says it all. Generic arguments don't match. >> Try changing >> MyStreamReceiver : IStreamReceiver<string, object> >> to >> MyStreamReceiver<T> : IStreamReceiver<string, T> >> >> On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com> wrote: >> >>> We have a requirement to set data to expire after some time. >>> I set the WithExpiryPolicy for cache instance, but the data added by >>> GetDataStreamer does not expire, due to it returning a new instance with >>> default policies. >>> So I am trying to use IStreamReceiver but not able to build the solution. >>> >>> IStreamReceiver Code: >>> public class MyStreamReceiver : IStreamReceiver<string, object> >>> { >>> public void Receive(ICache<string, object> cache, >>> ICollection<ICacheEntry<string, object>> entries) >>> { >>> foreach (var entry in entries) >>> { >>> cache.WithExpiryPolicy(new >>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, >>> entry.Value); >>> } >>> } >>> } >>> >>> Datastreamer code error >>> [image: image.png] >>> >>> How to implement IStreamReceiver. Please help me on this. >>> Regards, >>> Charlin >>> >>> >>> >>>