If you run multiple nodes in the cluster, the receiver may be invoked on
another node, so the breakpoint is not reached.
I've simplified yor code a bit and it works as expected:
https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c

On Mon, May 30, 2022 at 11:22 AM Charlin S <charli...@hotelhub.com> wrote:

> Hi,
> Thanks for the reply,
> First option working for me by creating a cache instance with expiry
> policy just before datastreamer.
> My curiosity with datastreamer and receiver also.
>
> no build error with new changes, but application not working as expected.
> added breakpoint in MyStreamReceiver but not reached
>
> using (var cacheDataStreamer =
> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
> T>(cacheName))
>                 {
>                     cacheDataStreamer.AllowOverwrite = true;
>                     cacheDataStreamer.Receiver = new
> MyStreamReceiver<T>();
>                     foreach (var item in data)
>                     {
>                         string cacheKey = item.Key;
>                         int index = cacheKey.IndexOf("Model:");
>                         if (index > 0)
>                             cacheKey = cacheKey.Insert(index +
> "Model:".Length, CacheKeyDefault);
>                         else
>                             cacheKey = CacheKeyDefault + cacheKey;
>                          cacheDataStreamer.AddData(cacheName + ":" +
> cacheKey, item.Value);
>
>
>                     }
>                     cacheDataStreamer.Flush();
>                 }
>
> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>     {
>         public void Receive(ICache<string, T> cache,
> ICollection<ICacheEntry<string, T>> entries)
>         {
>             foreach (var entry in entries)
>             {
>                 cache.WithExpiryPolicy(new
> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
> entry.Value);
>             }
>         }
>     }
>
> Regards,
> Charlin
>
>
> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org> wrote:
>
>> 1. You can set expiry policy in CacheConfiguration so that entries
>> inserted with DataStreamer are also affected,
>> see
>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>
>> 2. Compiler error says it all. Generic arguments don't match.
>> Try changing
>> MyStreamReceiver : IStreamReceiver<string, object>
>> to
>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>
>> On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com> wrote:
>>
>>> We have a requirement to set data to expire after some time.
>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>> GetDataStreamer does not expire, due to it returning a new instance with
>>> default policies.
>>> So I am trying to use IStreamReceiver but not able to build the solution.
>>>
>>> IStreamReceiver  Code:
>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>     {
>>>         public void Receive(ICache<string, object> cache,
>>> ICollection<ICacheEntry<string, object>> entries)
>>>         {
>>>             foreach (var entry in entries)
>>>             {
>>>                 cache.WithExpiryPolicy(new
>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>> entry.Value);
>>>             }
>>>         }
>>>     }
>>>
>>> Datastreamer code error
>>> [image: image.png]
>>>
>>> How to implement IStreamReceiver. Please help me on this.
>>> Regards,
>>> Charlin
>>>
>>>
>>>
>>>

Reply via email to