Thanks a lot Bill for creating the issue, I have updated it with a little bit more of info.
Cheers! -- Jonathan On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <b...@confluent.io> wrote: > Jonathan, Matthias > > I've created a Jira for this issue > https://issues.apache.org/jira/browse/KAFKA-8615. > > Jonathan, I plan to work on this when I get back from vacation on 7/8. If > you would like to work in this yourself before that, feel free to do so and > assign the ticket to yourself. > > Thanks, > Bill > > On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Sounds like a regression to me. > > > > We did change some code to track partition time differently. Can you > > open a Jira? > > > > > > -Matthias > > > > On 6/26/19 7:58 AM, Jonathan Santilli wrote: > > > Sure Bill, sure, is the same code I have reported the issue for the > > > suppress some months ago: > > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > > > > > In fact, I have reported at that moment, that after restarting the app, > > the > > > suppress was sending again downstream the already processed records. > > > Now, with the version 2.2.1+ after restarting the app, the > > > aggregation/suppress (do not know exactly where) is missing some > records > > to > > > be aggregated, even though they are in the input topic. > > > > > > Kafka Version 2.3 > > > > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor { > > > > > > @Override > > > > > > *public* *long* extract(*final* ConsumerRecord<Object, Object> > > record, > > > *final* *long* previousTimestamp) { > > > > > > > > > *// *previousTimestamp is always == -1 > > > > > > } > > > } > > > > > > final StreamsBuilder builder = new StreamsBuilder(); > > > final KStream<..., ...> events = builder > > > .stream(inputTopicNames, Consumed.with(..., ...) > > > .withTimestampExtractor(new OwnTimeExtractor()); > > > > > > events > > > .filter((k, v) -> ...) > > > .flatMapValues(v -> ...) > > > .flatMapValues(v -> ...) > > > .selectKey((k, v) -> v) > > > .groupByKey(Grouped.with(..., ...)) > > > .windowedBy( > > > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > > > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > > > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > > > .reduce((agg, new) -> { > > > ... > > > return agg; > > > }) > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > .toStream() > > > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > > > > > > > > > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <b...@confluent.io> wrote: > > > > > >> Thanks for the reply Jonathan. > > >> > > >> Are you in a position to share your code so I can try to reproduce on > my > > >> end? > > >> > > >> -Bill > > >> > > >> > > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli < > > >> jonathansanti...@gmail.com> wrote: > > >> > > >>> Hello Bill, > > >>> > > >>> am implementing the TimestampExtractor Interface, then using it to > > >> consume, > > >>> like: > > >>> > > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList, > > >> Consumed. > > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new > > >> *OwnTimeExtractor( > > >>> ...))); > > >>> > > >>> Am not setting the default.timestamp.extractor config value. > > >>> > > >>> Cheers! > > >>> -- > > >>> Jonathan > > >>> > > >>> > > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <b...@confluent.io> > wrote: > > >>> > > >>>> Hi Jonathan, > > >>>> > > >>>> Thanks for reporting this. Which timestamp extractor are you using > in > > >>> the > > >>>> configs? > > >>>> > > >>>> Thanks, > > >>>> Bill > > >>>> > > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli < > > >>>> jonathansanti...@gmail.com> wrote: > > >>>> > > >>>>> Hello, hope you all are doing well, > > >>>>> > > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I > have > > >>>>> noticed that now, the implementation of the method extract from the > > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor > > >>>>> > > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, > *long* > > >>>>> previousTimestamp) > > >>>>> > > >>>>> > > >>>>> is always returning -1 as value. > > >>>>> > > >>>>> > > >>>>> Previous version 2.2.1 was returning the correct value for the > record > > >>>>> partition. > > >>>>> > > >>>>> Am aware the interface is market as @InterfaceStability.Evolving > and > > >> we > > >>>>> should not rely on the stability/compatibility. Am just wondering > if > > >>> that > > >>>>> new behavior is intentional or is a bug. > > >>>>> > > >>>>> > > >>>>> Cheers! > > >>>>> -- > > >>>>> Santilli Jonathan > > >>>>> > > >>>> > > >>> > > >>> > > >>> -- > > >>> Santilli Jonathan > > >>> > > >> > > > > > > > > > > > -- Santilli Jonathan