Short answer seems to be that my Kafka LogRetentionTime was such that the metrics I was writing were getting purged from kafka during the test. Dropped metrics.
On Thu, Jun 15, 2017 at 1:32 PM, Caleb Welton <ca...@autonomic.ai> wrote: > I have encapsulated the repro into a small self contained project: > https://github.com/cwelton/kstreams-repro > > Thanks, > Caleb > > > On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote: > >> I do have a TimestampExtractor setup and for the 10 second windows that >> are emitted all the values expected in those windows are present, e.g. each >> 10 second window gets 100 values aggregated into it. >> >> I have no metrics with null keys or values. >> >> I will try to get the entire reproduction case packaged up in a way that >> I can more easily share. >> >> >> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Another thing to consider? Do you have records will null key or value? >>> Those records would be dropped and not processes. >>> >>> -Matthias >>> >>> On 6/15/17 6:24 AM, Garrett Barton wrote: >>> > Is your time usage correct? It sounds like you want event time not >>> > load/process time which is default unless you have a TimestampExtractor >>> > defined somewhere upstream? Otherwise I could see far fewer events >>> coming >>> > out as streams is just aggregating whatever showed up in that 10 second >>> > window. >>> > >>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> >>> wrote: >>> > >>> >> Disabling the cache with: >>> >> >>> >> ``` >>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE >>> RING_CONFIG, >>> >> 0) >>> >> ``` >>> >> >>> >> Results in: >>> >> - Emitting many more intermediate calculations. >>> >> - Still losing data. >>> >> >>> >> In my test case it output 342476 intermediate calculations for 3414 >>> >> distinct windows, 14400 distinct were expected. >>> >> >>> >> Regards, >>> >> Caleb >>> >> >>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax < >>> matth...@confluent.io> >>> >> wrote: >>> >> >>> >>> This seems to be related to internal KTable caches. You can disable >>> them >>> >>> by setting cache size to zero. >>> >>> >>> >>> http://docs.confluent.io/current/streams/developer- >>> >>> guide.html#memory-management >>> >>> >>> >>> -Matthias >>> >>> >>> >>> >>> >>> >>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote: >>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then >>> the >>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ >>> >>> THREADS_CONFIG=2` >>> >>>> or higher the problem shows up. >>> >>>> >>> >>>> When the number of threads is 1 the speed of data through the first >>> >> part >>> >>> of >>> >>>> the topology (before the ktable) slows down considerably, but it >>> seems >>> >> to >>> >>>> slow down to the speed of the output which may be the key. >>> >>>> >>> >>>> That said... Changing the number of stream threads should not impact >>> >> data >>> >>>> correctness. Seems like a bug someplace in kafka. >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> >>> >>> wrote: >>> >>>> >>> >>>>> I have a topology of >>> >>>>> KStream -> KTable -> KStream >>> >>>>> >>> >>>>> ``` >>> >>>>> >>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>> >>>>> final KStream<String, Metric> metricStream = >>> >>> builder.stream(ingestTopic); >>> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream >>> >>>>> .groupByKey(stringSerde, mySerde) >>> >>>>> .reduce(MyThing::merge, >>> >>>>> TimeWindows.of(10000).advanceBy(10000).until( >>> >>> Duration.ofDays(retentionDays).toMillis()), >>> >>>>> tableTopic); >>> >>>>> >>> >>>>> myTable.toStream() >>> >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), >>> >>> value.finalize(key.window()))); }) >>> >>>>> .to(stringSerde, mySerde, sinkTopic); >>> >>>>> >>> >>>>> ``` >>> >>>>> >>> >>>>> >>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric >>> for >>> >>>>> every 100 metrics it receives, based on the 10 second window width. >>> >>>>> >>> >>>>> When fed data real time at that rate it seems to do just that. >>> >>>>> >>> >>>>> However when I either reprocess on an input topic with a large >>> amount >>> >> of >>> >>>>> data or feed data in significantly faster I see a very different >>> >>> behavior. >>> >>>>> >>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being >>> >>> ingested >>> >>>>> into the ktable, but only 633 emitted from it (expected 14400). >>> >>>>> >>> >>>>> Over the next minute the records output creeps to 1796, but then >>> holds >>> >>>>> steady and does not keep going up to the expected total of 14400. >>> >>>>> >>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264, >>> >> which >>> >>> is >>> >>>>> lower than the 1796 records I would have anticipated from the >>> number >>> >> of >>> >>>>> calls into the final kstream map function. >>> >>>>> >>> >>>>> Precise number of emitted records will vary from one run to the >>> next. >>> >>>>> >>> >>>>> Where are the extra metrics going? Is there some commit issue >>> that is >>> >>>>> causing dropped messages if the ktable producer isn't able to keep >>> up? >>> >>>>> >>> >>>>> Any recommendations on where to focus the investigation of the >>> issue? >>> >>>>> >>> >>>>> Running Kafka 0.10.2.1. >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Caleb >>> >>>>> >>> >>>> >>> >>> >>> >>> >>> >> >>> > >>> >>> >> >