Thank for your reply. From code I see that you log only entries with non
null values. If your absolutely shure that you never put null in cache - I
will create loadtest to reproduce it and create issue for you. But it will
be great, if you move logging before event.getValue! = null.

среда, 7 июня 2017 г. пользователь begineer написал:

> Hi.. Sorry its quite late to reply. CQ is setup in execute method of
> service
> not in init(), but we do have initialQuery in CQ to scan existing events to
> matching the filter. Below is snapshot of one of the many ignite services
> set to process trade on when trade moves to particular status.
>
> As you can see, I have added logs to remote filter predicate. But these
> logs
> don't get printed when trade get stuck at particular status. So I assume,
> remote filter does not pick the events it is supposed to track.
>
> public enum TradeStatus {
>         NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
> }
>
>
> /**
>  * Ignite Service which picks up CHANGED trade delivery items
>  */
> public class ChangedTradeService implements Service{
>
>         @IgniteInstanceResource
>         private transient Ignite ignite;
>         private transient IgniteCache<Long, Trade> tradeCache;
>         private transient QueryCursor<Entry&lt;Long, Trade>> cursor;
>
>         @Override
>         public void init(ServiceContext serviceContext) throws Exception {
>                 tradeCache = ignite.cache("tradeCache");
>         }
>
>         @Override
>         public void execute(ServiceContext serviceContext) throws
> Exception {
>                 ContinuousQuery<Long, Trade> query = new
> ContinuousQuery<>();
>                 query.setLocalListener((CacheEntryUpdatedListenerAsync<Long,
> Trade>)
> events -> events
>                                 .forEach(event ->
> process(event.getValue())));
>                 query.setRemoteFilterFactory(
> factoryOf(checkStatus(status)));
>                 query.setInitialQuery(new ScanQuery<>(
> checkStatusPredicate(status)));
>                 QueryCursor<Cache.Entry&lt;Long, Trade>> cursor =
> tradeCache.query(query);
>                 cursor.forEach(entry -> process(entry.getValue()));
>         }
>
>         private void process(Trade item){
>              log.info("transition started for trade id :"+item.getPkey());
>                 //move the trade to next state(e.g SUCCESS) and next
> Service(contains CQ,
> which is looking for SUCCESS status) will pick this up for processing
> further and so on
>              log.info("transition finished for trade id
> :"+item.getPkey());
> }
>
>         @Override
>         public void cancel(ServiceContext serviceContext) {
>                 cursor.close();
>         }
>
>         static CacheEntryEventFilterAsync<Long, Trade>
> checkStatus(TradeStatus
> status) {
>                 return event -> event.getValue() != null &&
> checkStatusPredicate(status).apply(event.getKey(), event.getValue());
>         }
>
>         static IgniteBiPredicate<Long, TradeStatus>
> checkStatusPredicate(TradeStatus status) {
>                 return (k, v) -> {
>                         LOG.debug("Status checking for: {} Event value: {}
> isStatus: {}", status,
> v, v.getStatus() == status);
>                         return v.getStatus() == status;
>                 };
>         }
> }
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-
> or-respond-really-late-tp12338p13476.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Reply via email to