Sounds like a regression to me. We did change some code to track partition time differently. Can you open a Jira?
-Matthias On 6/26/19 7:58 AM, Jonathan Santilli wrote: > Sure Bill, sure, is the same code I have reported the issue for the > suppress some months ago: > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > In fact, I have reported at that moment, that after restarting the app, the > suppress was sending again downstream the already processed records. > Now, with the version 2.2.1+ after restarting the app, the > aggregation/suppress (do not know exactly where) is missing some records to > be aggregated, even though they are in the input topic. > > Kafka Version 2.3 > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor { > > @Override > > *public* *long* extract(*final* ConsumerRecord<Object, Object> record, > *final* *long* previousTimestamp) { > > > *// *previousTimestamp is always == -1 > > } > } > > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<..., ...> events = builder > .stream(inputTopicNames, Consumed.with(..., ...) > .withTimestampExtractor(new OwnTimeExtractor()); > > events > .filter((k, v) -> ...) > .flatMapValues(v -> ...) > .flatMapValues(v -> ...) > .selectKey((k, v) -> v) > .groupByKey(Grouped.with(..., ...)) > .windowedBy( > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > .reduce((agg, new) -> { > ... > return agg; > }) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream() > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <b...@confluent.io> wrote: > >> Thanks for the reply Jonathan. >> >> Are you in a position to share your code so I can try to reproduce on my >> end? >> >> -Bill >> >> >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli < >> jonathansanti...@gmail.com> wrote: >> >>> Hello Bill, >>> >>> am implementing the TimestampExtractor Interface, then using it to >> consume, >>> like: >>> >>> *final* KStream<..., ...> events = builder.stream(inputTopicList, >> Consumed. >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new >> *OwnTimeExtractor( >>> ...))); >>> >>> Am not setting the default.timestamp.extractor config value. >>> >>> Cheers! >>> -- >>> Jonathan >>> >>> >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <b...@confluent.io> wrote: >>> >>>> Hi Jonathan, >>>> >>>> Thanks for reporting this. Which timestamp extractor are you using in >>> the >>>> configs? >>>> >>>> Thanks, >>>> Bill >>>> >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli < >>>> jonathansanti...@gmail.com> wrote: >>>> >>>>> Hello, hope you all are doing well, >>>>> >>>>> am testing the new version 2.3 for Kafka Streams specifically. I have >>>>> noticed that now, the implementation of the method extract from the >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor >>>>> >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long* >>>>> previousTimestamp) >>>>> >>>>> >>>>> is always returning -1 as value. >>>>> >>>>> >>>>> Previous version 2.2.1 was returning the correct value for the record >>>>> partition. >>>>> >>>>> Am aware the interface is market as @InterfaceStability.Evolving and >> we >>>>> should not rely on the stability/compatibility. Am just wondering if >>> that >>>>> new behavior is intentional or is a bug. >>>>> >>>>> >>>>> Cheers! >>>>> -- >>>>> Santilli Jonathan >>>>> >>>> >>> >>> >>> -- >>> Santilli Jonathan >>> >> > >
signature.asc
Description: OpenPGP digital signature