Another thing to consider? Do you have records will null key or value? Those records would be dropped and not processes.
-Matthias On 6/15/17 6:24 AM, Garrett Barton wrote: > Is your time usage correct? It sounds like you want event time not > load/process time which is default unless you have a TimestampExtractor > defined somewhere upstream? Otherwise I could see far fewer events coming > out as streams is just aggregating whatever showed up in that 10 second > window. > > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote: > >> Disabling the cache with: >> >> ``` >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >> 0) >> ``` >> >> Results in: >> - Emitting many more intermediate calculations. >> - Still losing data. >> >> In my test case it output 342476 intermediate calculations for 3414 >> distinct windows, 14400 distinct were expected. >> >> Regards, >> Caleb >> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> This seems to be related to internal KTable caches. You can disable them >>> by setting cache size to zero. >>> >>> http://docs.confluent.io/current/streams/developer- >>> guide.html#memory-management >>> >>> -Matthias >>> >>> >>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote: >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ >>> THREADS_CONFIG=2` >>>> or higher the problem shows up. >>>> >>>> When the number of threads is 1 the speed of data through the first >> part >>> of >>>> the topology (before the ktable) slows down considerably, but it seems >> to >>>> slow down to the speed of the output which may be the key. >>>> >>>> That said... Changing the number of stream threads should not impact >> data >>>> correctness. Seems like a bug someplace in kafka. >>>> >>>> >>>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> >>> wrote: >>>> >>>>> I have a topology of >>>>> KStream -> KTable -> KStream >>>>> >>>>> ``` >>>>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>>>> final KStream<String, Metric> metricStream = >>> builder.stream(ingestTopic); >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream >>>>> .groupByKey(stringSerde, mySerde) >>>>> .reduce(MyThing::merge, >>>>> TimeWindows.of(10000).advanceBy(10000).until( >>> Duration.ofDays(retentionDays).toMillis()), >>>>> tableTopic); >>>>> >>>>> myTable.toStream() >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), >>> value.finalize(key.window()))); }) >>>>> .to(stringSerde, mySerde, sinkTopic); >>>>> >>>>> ``` >>>>> >>>>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric for >>>>> every 100 metrics it receives, based on the 10 second window width. >>>>> >>>>> When fed data real time at that rate it seems to do just that. >>>>> >>>>> However when I either reprocess on an input topic with a large amount >> of >>>>> data or feed data in significantly faster I see a very different >>> behavior. >>>>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being >>> ingested >>>>> into the ktable, but only 633 emitted from it (expected 14400). >>>>> >>>>> Over the next minute the records output creeps to 1796, but then holds >>>>> steady and does not keep going up to the expected total of 14400. >>>>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264, >> which >>> is >>>>> lower than the 1796 records I would have anticipated from the number >> of >>>>> calls into the final kstream map function. >>>>> >>>>> Precise number of emitted records will vary from one run to the next. >>>>> >>>>> Where are the extra metrics going? Is there some commit issue that is >>>>> causing dropped messages if the ktable producer isn't able to keep up? >>>>> >>>>> Any recommendations on where to focus the investigation of the issue? >>>>> >>>>> Running Kafka 0.10.2.1. >>>>> >>>>> Thanks, >>>>> Caleb >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature