Thanks for the notice Jonathan! We tracked down the problem and it should be an easy fix: https://github.com/apache/kafka/pull/6719/files
On Fri, Jul 5, 2019 at 6:25 AM Jonathan Santilli <jonathansanti...@gmail.com> wrote: > Thanks a lot Bill for creating the issue, I have updated it with a little > bit more of info. > > Cheers! > -- > Jonathan > > > > > On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <b...@confluent.io> wrote: > > > Jonathan, Matthias > > > > I've created a Jira for this issue > > https://issues.apache.org/jira/browse/KAFKA-8615. > > > > Jonathan, I plan to work on this when I get back from vacation on 7/8. > If > > you would like to work in this yourself before that, feel free to do so > and > > assign the ticket to yourself. > > > > Thanks, > > Bill > > > > On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Sounds like a regression to me. > > > > > > We did change some code to track partition time differently. Can you > > > open a Jira? > > > > > > > > > -Matthias > > > > > > On 6/26/19 7:58 AM, Jonathan Santilli wrote: > > > > Sure Bill, sure, is the same code I have reported the issue for the > > > > suppress some months ago: > > > > > > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > > > > > > > In fact, I have reported at that moment, that after restarting the > app, > > > the > > > > suppress was sending again downstream the already processed records. > > > > Now, with the version 2.2.1+ after restarting the app, the > > > > aggregation/suppress (do not know exactly where) is missing some > > records > > > to > > > > be aggregated, even though they are in the input topic. > > > > > > > > Kafka Version 2.3 > > > > > > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor { > > > > > > > > @Override > > > > > > > > *public* *long* extract(*final* ConsumerRecord<Object, Object> > > > record, > > > > *final* *long* previousTimestamp) { > > > > > > > > > > > > *// *previousTimestamp is always == -1 > > > > > > > > } > > > > } > > > > > > > > final StreamsBuilder builder = new StreamsBuilder(); > > > > final KStream<..., ...> events = builder > > > > .stream(inputTopicNames, Consumed.with(..., ...) > > > > .withTimestampExtractor(new OwnTimeExtractor()); > > > > > > > > events > > > > .filter((k, v) -> ...) > > > > .flatMapValues(v -> ...) > > > > .flatMapValues(v -> ...) > > > > .selectKey((k, v) -> v) > > > > .groupByKey(Grouped.with(..., ...)) > > > > .windowedBy( > > > > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > > > > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > > > > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > > > > .reduce((agg, new) -> { > > > > ... > > > > return agg; > > > > }) > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream() > > > > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > > > > > > > > > > > > > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <b...@confluent.io> > wrote: > > > > > > > >> Thanks for the reply Jonathan. > > > >> > > > >> Are you in a position to share your code so I can try to reproduce > on > > my > > > >> end? > > > >> > > > >> -Bill > > > >> > > > >> > > > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli < > > > >> jonathansanti...@gmail.com> wrote: > > > >> > > > >>> Hello Bill, > > > >>> > > > >>> am implementing the TimestampExtractor Interface, then using it to > > > >> consume, > > > >>> like: > > > >>> > > > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList, > > > >> Consumed. > > > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new > > > >> *OwnTimeExtractor( > > > >>> ...))); > > > >>> > > > >>> Am not setting the default.timestamp.extractor config value. > > > >>> > > > >>> Cheers! > > > >>> -- > > > >>> Jonathan > > > >>> > > > >>> > > > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <b...@confluent.io> > > wrote: > > > >>> > > > >>>> Hi Jonathan, > > > >>>> > > > >>>> Thanks for reporting this. Which timestamp extractor are you > using > > in > > > >>> the > > > >>>> configs? > > > >>>> > > > >>>> Thanks, > > > >>>> Bill > > > >>>> > > > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli < > > > >>>> jonathansanti...@gmail.com> wrote: > > > >>>> > > > >>>>> Hello, hope you all are doing well, > > > >>>>> > > > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I > > have > > > >>>>> noticed that now, the implementation of the method extract from > the > > > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor > > > >>>>> > > > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, > > *long* > > > >>>>> previousTimestamp) > > > >>>>> > > > >>>>> > > > >>>>> is always returning -1 as value. > > > >>>>> > > > >>>>> > > > >>>>> Previous version 2.2.1 was returning the correct value for the > > record > > > >>>>> partition. > > > >>>>> > > > >>>>> Am aware the interface is market as @InterfaceStability.Evolving > > and > > > >> we > > > >>>>> should not rely on the stability/compatibility. Am just wondering > > if > > > >>> that > > > >>>>> new behavior is intentional or is a bug. > > > >>>>> > > > >>>>> > > > >>>>> Cheers! > > > >>>>> -- > > > >>>>> Santilli Jonathan > > > >>>>> > > > >>>> > > > >>> > > > >>> > > > >>> -- > > > >>> Santilli Jonathan > > > >>> > > > >> > > > > > > > > > > > > > > > > > > > -- > Santilli Jonathan >