I do have a TimestampExtractor setup and for the 10 second windows that are emitted all the values expected in those windows are present, e.g. each 10 second window gets 100 values aggregated into it.
I have no metrics with null keys or values. I will try to get the entire reproduction case packaged up in a way that I can more easily share. On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Another thing to consider? Do you have records will null key or value? > Those records would be dropped and not processes. > > -Matthias > > On 6/15/17 6:24 AM, Garrett Barton wrote: > > Is your time usage correct? It sounds like you want event time not > > load/process time which is default unless you have a TimestampExtractor > > defined somewhere upstream? Otherwise I could see far fewer events > coming > > out as streams is just aggregating whatever showed up in that 10 second > > window. > > > > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> > wrote: > > > >> Disabling the cache with: > >> > >> ``` > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > >> 0) > >> ``` > >> > >> Results in: > >> - Emitting many more intermediate calculations. > >> - Still losing data. > >> > >> In my test case it output 342476 intermediate calculations for 3414 > >> distinct windows, 14400 distinct were expected. > >> > >> Regards, > >> Caleb > >> > >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io > > > >> wrote: > >> > >>> This seems to be related to internal KTable caches. You can disable > them > >>> by setting cache size to zero. > >>> > >>> http://docs.confluent.io/current/streams/developer- > >>> guide.html#memory-management > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 6/14/17 4:08 PM, Caleb Welton wrote: > >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the > >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ > >>> THREADS_CONFIG=2` > >>>> or higher the problem shows up. > >>>> > >>>> When the number of threads is 1 the speed of data through the first > >> part > >>> of > >>>> the topology (before the ktable) slows down considerably, but it seems > >> to > >>>> slow down to the speed of the output which may be the key. > >>>> > >>>> That said... Changing the number of stream threads should not impact > >> data > >>>> correctness. Seems like a bug someplace in kafka. > >>>> > >>>> > >>>> > >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> > >>> wrote: > >>>> > >>>>> I have a topology of > >>>>> KStream -> KTable -> KStream > >>>>> > >>>>> ``` > >>>>> > >>>>> final KStreamBuilder builder = new KStreamBuilder(); > >>>>> final KStream<String, Metric> metricStream = > >>> builder.stream(ingestTopic); > >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream > >>>>> .groupByKey(stringSerde, mySerde) > >>>>> .reduce(MyThing::merge, > >>>>> TimeWindows.of(10000).advanceBy(10000).until( > >>> Duration.ofDays(retentionDays).toMillis()), > >>>>> tableTopic); > >>>>> > >>>>> myTable.toStream() > >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), > >>> value.finalize(key.window()))); }) > >>>>> .to(stringSerde, mySerde, sinkTopic); > >>>>> > >>>>> ``` > >>>>> > >>>>> > >>>>> Normally went sent data at 10x a second I expect ~1 output metric for > >>>>> every 100 metrics it receives, based on the 10 second window width. > >>>>> > >>>>> When fed data real time at that rate it seems to do just that. > >>>>> > >>>>> However when I either reprocess on an input topic with a large amount > >> of > >>>>> data or feed data in significantly faster I see a very different > >>> behavior. > >>>>> > >>>>> Over the course of 20 seconds I can see 1,440,000 messages being > >>> ingested > >>>>> into the ktable, but only 633 emitted from it (expected 14400). > >>>>> > >>>>> Over the next minute the records output creeps to 1796, but then > holds > >>>>> steady and does not keep going up to the expected total of 14400. > >>>>> > >>>>> A consumer reading from the sinkTopic ends up finding about 1264, > >> which > >>> is > >>>>> lower than the 1796 records I would have anticipated from the number > >> of > >>>>> calls into the final kstream map function. > >>>>> > >>>>> Precise number of emitted records will vary from one run to the next. > >>>>> > >>>>> Where are the extra metrics going? Is there some commit issue that > is > >>>>> causing dropped messages if the ktable producer isn't able to keep > up? > >>>>> > >>>>> Any recommendations on where to focus the investigation of the issue? > >>>>> > >>>>> Running Kafka 0.10.2.1. > >>>>> > >>>>> Thanks, > >>>>> Caleb > >>>>> > >>>> > >>> > >>> > >> > > > >