Thank for your reply. From code I see that you log only entries with non
null values. If your absolutely shure that you never put null in cache - I
will create loadtest to reproduce it and create issue for you. But it will
be great, if you move logging before event.getValue! = null.
среда, 7 июня 2017 г. пользователь begineer написал:
> Hi.. Sorry its quite late to reply. CQ is setup in execute method of
> service
> not in init(), but we do have initialQuery in CQ to scan existing events to
> matching the filter. Below is snapshot of one of the many ignite services
> set to process trade on when trade moves to particular status.
>
> As you can see, I have added logs to remote filter predicate. But these
> logs
> don't get printed when trade get stuck at particular status. So I assume,
> remote filter does not pick the events it is supposed to track.
>
> public enum TradeStatus {
> NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
> }
>
>
> /**
> * Ignite Service which picks up CHANGED trade delivery items
> */
> public class ChangedTradeService implements Service{
>
> @IgniteInstanceResource
> private transient Ignite ignite;
> private transient IgniteCache<Long, Trade> tradeCache;
> private transient QueryCursor<Entry<Long, Trade>> cursor;
>
> @Override
> public void init(ServiceContext serviceContext) throws Exception {
> tradeCache = ignite.cache("tradeCache");
> }
>
> @Override
> public void execute(ServiceContext serviceContext) throws
> Exception {
> ContinuousQuery<Long, Trade> query = new
> ContinuousQuery<>();
> query.setLocalListener((CacheEntryUpdatedListenerAsync<Long,
> Trade>)
> events -> events
> .forEach(event ->
> process(event.getValue())));
> query.setRemoteFilterFactory(
> factoryOf(checkStatus(status)));
> query.setInitialQuery(new ScanQuery<>(
> checkStatusPredicate(status)));
> QueryCursor<Cache.Entry<Long, Trade>> cursor =
> tradeCache.query(query);
> cursor.forEach(entry -> process(entry.getValue()));
> }
>
> private void process(Trade item){
> log.info("transition started for trade id :"+item.getPkey());
> //move the trade to next state(e.g SUCCESS) and next
> Service(contains CQ,
> which is looking for SUCCESS status) will pick this up for processing
> further and so on
> log.info("transition finished for trade id
> :"+item.getPkey());
> }
>
> @Override
> public void cancel(ServiceContext serviceContext) {
> cursor.close();
> }
>
> static CacheEntryEventFilterAsync<Long, Trade>
> checkStatus(TradeStatus
> status) {
> return event -> event.getValue() != null &&
> checkStatusPredicate(status).apply(event.getKey(), event.getValue());
> }
>
> static IgniteBiPredicate<Long, TradeStatus>
> checkStatusPredicate(TradeStatus status) {
> return (k, v) -> {
> LOG.debug("Status checking for: {} Event value: {}
> isStatus: {}", status,
> v, v.getStatus() == status);
> return v.getStatus() == status;
> };
> }
> }
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-
> or-respond-really-late-tp12338p13476.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>