Sure Bill, sure, is the same code I have reported the issue for the suppress some months ago: https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
In fact, I have reported at that moment, that after restarting the app, the suppress was sending again downstream the already processed records. Now, with the version 2.2.1+ after restarting the app, the aggregation/suppress (do not know exactly where) is missing some records to be aggregated, even though they are in the input topic. Kafka Version 2.3 *public* *class* OwnTimeExtractor *implements* TimestampExtractor { @Override *public* *long* extract(*final* ConsumerRecord<Object, Object> record, *final* *long* previousTimestamp) { *// *previousTimestamp is always == -1 } } final StreamsBuilder builder = new StreamsBuilder(); final KStream<..., ...> events = builder .stream(inputTopicNames, Consumed.with(..., ...) .withTimestampExtractor(new OwnTimeExtractor()); events .filter((k, v) -> ...) .flatMapValues(v -> ...) .flatMapValues(v -> ...) .selectKey((k, v) -> v) .groupByKey(Grouped.with(..., ...)) .windowedBy( TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) .advanceBy(Duration.ofSeconds(windowSizeInSecs)) .grace(Duration.ofSeconds(windowSizeGraceInSecs))) .reduce((agg, new) -> { ... return agg; }) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <b...@confluent.io> wrote: > Thanks for the reply Jonathan. > > Are you in a position to share your code so I can try to reproduce on my > end? > > -Bill > > > On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli < > jonathansanti...@gmail.com> wrote: > > > Hello Bill, > > > > am implementing the TimestampExtractor Interface, then using it to > consume, > > like: > > > > *final* KStream<..., ...> events = builder.stream(inputTopicList, > Consumed. > > *with*(keySerde, valueSerde).withTimestampExtractor(*new > *OwnTimeExtractor( > > ...))); > > > > Am not setting the default.timestamp.extractor config value. > > > > Cheers! > > -- > > Jonathan > > > > > > On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <b...@confluent.io> wrote: > > > > > Hi Jonathan, > > > > > > Thanks for reporting this. Which timestamp extractor are you using in > > the > > > configs? > > > > > > Thanks, > > > Bill > > > > > > On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli < > > > jonathansanti...@gmail.com> wrote: > > > > > > > Hello, hope you all are doing well, > > > > > > > > am testing the new version 2.3 for Kafka Streams specifically. I have > > > > noticed that now, the implementation of the method extract from the > > > > interface org.apache.kafka.streams.processor.TimestampExtractor > > > > > > > > *public* *long* extract(ConsumerRecord<Object, Object> record, *long* > > > > previousTimestamp) > > > > > > > > > > > > is always returning -1 as value. > > > > > > > > > > > > Previous version 2.2.1 was returning the correct value for the record > > > > partition. > > > > > > > > Am aware the interface is market as @InterfaceStability.Evolving and > we > > > > should not rely on the stability/compatibility. Am just wondering if > > that > > > > new behavior is intentional or is a bug. > > > > > > > > > > > > Cheers! > > > > -- > > > > Santilli Jonathan > > > > > > > > > > > > > -- > > Santilli Jonathan > > > -- Santilli Jonathan