About > 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2.
Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would assume this issue is fixed, too. If not, please report back. > Another question that I have is, is there a way for us detect how many > messages have come out of order? And if possible, what is the delay? There is no metric or api for this. What you could do though is, to use #transform() that only forwards each record and as a side task, extracts the timestamp via `context#timestamp()` and does some book keeping to compute if out-of-order and what the delay was. >>> - same for .mapValues() >>> >> >> I am not sure how to check this. The same way as you do for filter()? -Matthias On 5/4/17 10:29 AM, Mahendra Kariya wrote: > Hi Matthias, > > Please find the answers below. > > I would recommend to double check the following: >> >> - can you confirm that the filter does not remove all data for those >> time periods? >> > > Filter does not remove all data. There is a lot of data coming in even > after the filter stage. > > >> - I would also check input for your AggregatorFunction() -- does it >> receive everything? >> > > Yes. Aggregate function seems to be receiving everything. > > >> - same for .mapValues() >> > > I am not sure how to check this. >
signature.asc
Description: OpenPGP digital signature