Jonathan, Matthias I've created a Jira for this issue https://issues.apache.org/jira/browse/KAFKA-8615.
Jonathan, I plan to work on this when I get back from vacation on 7/8. If you would like to work in this yourself before that, feel free to do so and assign the ticket to yourself. Thanks, Bill On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <matth...@confluent.io> wrote: > Sounds like a regression to me. > > We did change some code to track partition time differently. Can you > open a Jira? > > > -Matthias > > On 6/26/19 7:58 AM, Jonathan Santilli wrote: > > Sure Bill, sure, is the same code I have reported the issue for the > > suppress some months ago: > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > > > In fact, I have reported at that moment, that after restarting the app, > the > > suppress was sending again downstream the already processed records. > > Now, with the version 2.2.1+ after restarting the app, the > > aggregation/suppress (do not know exactly where) is missing some records > to > > be aggregated, even though they are in the input topic. > > > > Kafka Version 2.3 > > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor { > > > > @Override > > > > *public* *long* extract(*final* ConsumerRecord<Object, Object> > record, > > *final* *long* previousTimestamp) { > > > > > > *// *previousTimestamp is always == -1 > > > > } > > } > > > > final StreamsBuilder builder = new StreamsBuilder(); > > final KStream<..., ...> events = builder > > .stream(inputTopicNames, Consumed.with(..., ...) > > .withTimestampExtractor(new OwnTimeExtractor()); > > > > events > > .filter((k, v) -> ...) > > .flatMapValues(v -> ...) > > .flatMapValues(v -> ...) > > .selectKey((k, v) -> v) > > .groupByKey(Grouped.with(..., ...)) > > .windowedBy( > > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > > .reduce((agg, new) -> { > > ... > > return agg; > > }) > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream() > > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > > > > > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <b...@confluent.io> wrote: > > > >> Thanks for the reply Jonathan. > >> > >> Are you in a position to share your code so I can try to reproduce on my > >> end? > >> > >> -Bill > >> > >> > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli < > >> jonathansanti...@gmail.com> wrote: > >> > >>> Hello Bill, > >>> > >>> am implementing the TimestampExtractor Interface, then using it to > >> consume, > >>> like: > >>> > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList, > >> Consumed. > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new > >> *OwnTimeExtractor( > >>> ...))); > >>> > >>> Am not setting the default.timestamp.extractor config value. > >>> > >>> Cheers! > >>> -- > >>> Jonathan > >>> > >>> > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <b...@confluent.io> wrote: > >>> > >>>> Hi Jonathan, > >>>> > >>>> Thanks for reporting this. Which timestamp extractor are you using in > >>> the > >>>> configs? > >>>> > >>>> Thanks, > >>>> Bill > >>>> > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli < > >>>> jonathansanti...@gmail.com> wrote: > >>>> > >>>>> Hello, hope you all are doing well, > >>>>> > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I have > >>>>> noticed that now, the implementation of the method extract from the > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor > >>>>> > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long* > >>>>> previousTimestamp) > >>>>> > >>>>> > >>>>> is always returning -1 as value. > >>>>> > >>>>> > >>>>> Previous version 2.2.1 was returning the correct value for the record > >>>>> partition. > >>>>> > >>>>> Am aware the interface is market as @InterfaceStability.Evolving and > >> we > >>>>> should not rely on the stability/compatibility. Am just wondering if > >>> that > >>>>> new behavior is intentional or is a bug. > >>>>> > >>>>> > >>>>> Cheers! > >>>>> -- > >>>>> Santilli Jonathan > >>>>> > >>>> > >>> > >>> > >>> -- > >>> Santilli Jonathan > >>> > >> > > > > > >