I have encapsulated the repro into a small self contained project: https://github.com/cwelton/kstreams-repro
Thanks, Caleb On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote: > I do have a TimestampExtractor setup and for the 10 second windows that > are emitted all the values expected in those windows are present, e.g. each > 10 second window gets 100 values aggregated into it. > > I have no metrics with null keys or values. > > I will try to get the entire reproduction case packaged up in a way that I > can more easily share. > > > On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Another thing to consider? Do you have records will null key or value? >> Those records would be dropped and not processes. >> >> -Matthias >> >> On 6/15/17 6:24 AM, Garrett Barton wrote: >> > Is your time usage correct? It sounds like you want event time not >> > load/process time which is default unless you have a TimestampExtractor >> > defined somewhere upstream? Otherwise I could see far fewer events >> coming >> > out as streams is just aggregating whatever showed up in that 10 second >> > window. >> > >> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> >> wrote: >> > >> >> Disabling the cache with: >> >> >> >> ``` >> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE >> RING_CONFIG, >> >> 0) >> >> ``` >> >> >> >> Results in: >> >> - Emitting many more intermediate calculations. >> >> - Still losing data. >> >> >> >> In my test case it output 342476 intermediate calculations for 3414 >> >> distinct windows, 14400 distinct were expected. >> >> >> >> Regards, >> >> Caleb >> >> >> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax < >> matth...@confluent.io> >> >> wrote: >> >> >> >>> This seems to be related to internal KTable caches. You can disable >> them >> >>> by setting cache size to zero. >> >>> >> >>> http://docs.confluent.io/current/streams/developer- >> >>> guide.html#memory-management >> >>> >> >>> -Matthias >> >>> >> >>> >> >>> >> >>> On 6/14/17 4:08 PM, Caleb Welton wrote: >> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then >> the >> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ >> >>> THREADS_CONFIG=2` >> >>>> or higher the problem shows up. >> >>>> >> >>>> When the number of threads is 1 the speed of data through the first >> >> part >> >>> of >> >>>> the topology (before the ktable) slows down considerably, but it >> seems >> >> to >> >>>> slow down to the speed of the output which may be the key. >> >>>> >> >>>> That said... Changing the number of stream threads should not impact >> >> data >> >>>> correctness. Seems like a bug someplace in kafka. >> >>>> >> >>>> >> >>>> >> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> >> >>> wrote: >> >>>> >> >>>>> I have a topology of >> >>>>> KStream -> KTable -> KStream >> >>>>> >> >>>>> ``` >> >>>>> >> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >> >>>>> final KStream<String, Metric> metricStream = >> >>> builder.stream(ingestTopic); >> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream >> >>>>> .groupByKey(stringSerde, mySerde) >> >>>>> .reduce(MyThing::merge, >> >>>>> TimeWindows.of(10000).advanceBy(10000).until( >> >>> Duration.ofDays(retentionDays).toMillis()), >> >>>>> tableTopic); >> >>>>> >> >>>>> myTable.toStream() >> >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), >> >>> value.finalize(key.window()))); }) >> >>>>> .to(stringSerde, mySerde, sinkTopic); >> >>>>> >> >>>>> ``` >> >>>>> >> >>>>> >> >>>>> Normally went sent data at 10x a second I expect ~1 output metric >> for >> >>>>> every 100 metrics it receives, based on the 10 second window width. >> >>>>> >> >>>>> When fed data real time at that rate it seems to do just that. >> >>>>> >> >>>>> However when I either reprocess on an input topic with a large >> amount >> >> of >> >>>>> data or feed data in significantly faster I see a very different >> >>> behavior. >> >>>>> >> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being >> >>> ingested >> >>>>> into the ktable, but only 633 emitted from it (expected 14400). >> >>>>> >> >>>>> Over the next minute the records output creeps to 1796, but then >> holds >> >>>>> steady and does not keep going up to the expected total of 14400. >> >>>>> >> >>>>> A consumer reading from the sinkTopic ends up finding about 1264, >> >> which >> >>> is >> >>>>> lower than the 1796 records I would have anticipated from the number >> >> of >> >>>>> calls into the final kstream map function. >> >>>>> >> >>>>> Precise number of emitted records will vary from one run to the >> next. >> >>>>> >> >>>>> Where are the extra metrics going? Is there some commit issue that >> is >> >>>>> causing dropped messages if the ktable producer isn't able to keep >> up? >> >>>>> >> >>>>> Any recommendations on where to focus the investigation of the >> issue? >> >>>>> >> >>>>> Running Kafka 0.10.2.1. >> >>>>> >> >>>>> Thanks, >> >>>>> Caleb >> >>>>> >> >>>> >> >>> >> >>> >> >> >> > >> >> >