I have encapsulated the repro into a small self contained project:
https://github.com/cwelton/kstreams-repro

Thanks,
  Caleb


On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:

> I do have a TimestampExtractor setup and for the 10 second windows that
> are emitted all the values expected in those windows are present, e.g. each
> 10 second window gets 100 values aggregated into it.
>
> I have no metrics with null keys or values.
>
> I will try to get the entire reproduction case packaged up in a way that I
> can more easily share.
>
>
> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Another thing to consider? Do you have records will null key or value?
>> Those records would be dropped and not processes.
>>
>> -Matthias
>>
>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>> > Is your time usage correct?  It sounds like you want event time not
>> > load/process time which is default unless you have a TimestampExtractor
>> > defined somewhere upstream?  Otherwise I could see far fewer events
>> coming
>> > out as streams is just aggregating whatever showed up in that 10 second
>> > window.
>> >
>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>> wrote:
>> >
>> >> Disabling the cache with:
>> >>
>> >> ```
>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>> RING_CONFIG,
>> >> 0)
>> >> ```
>> >>
>> >> Results in:
>> >> - Emitting many more intermediate calculations.
>> >> - Still losing data.
>> >>
>> >> In my test case it output 342476 intermediate calculations for 3414
>> >> distinct windows, 14400 distinct were expected.
>> >>
>> >> Regards,
>> >>   Caleb
>> >>
>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> >> wrote:
>> >>
>> >>> This seems to be related to internal KTable caches. You can disable
>> them
>> >>> by setting cache size to zero.
>> >>>
>> >>> http://docs.confluent.io/current/streams/developer-
>> >>> guide.html#memory-management
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>>
>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>> the
>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>> >>> THREADS_CONFIG=2`
>> >>>> or higher the problem shows up.
>> >>>>
>> >>>> When the number of threads is 1 the speed of data through the first
>> >> part
>> >>> of
>> >>>> the topology (before the ktable) slows down considerably, but it
>> seems
>> >> to
>> >>>> slow down to the speed of the output which may be the key.
>> >>>>
>> >>>> That said... Changing the number of stream threads should not impact
>> >> data
>> >>>> correctness.  Seems like a bug someplace in kafka.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>> >>> wrote:
>> >>>>
>> >>>>> I have a topology of
>> >>>>>     KStream -> KTable -> KStream
>> >>>>>
>> >>>>> ```
>> >>>>>
>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>> >>>>> final KStream<String, Metric> metricStream =
>> >>> builder.stream(ingestTopic);
>> >>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>> >>>>>         .groupByKey(stringSerde, mySerde)
>> >>>>>         .reduce(MyThing::merge,
>> >>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>> >>> Duration.ofDays(retentionDays).toMillis()),
>> >>>>>                 tableTopic);
>> >>>>>
>> >>>>> myTable.toStream()
>> >>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>> >>> value.finalize(key.window()))); })
>> >>>>>         .to(stringSerde, mySerde, sinkTopic);
>> >>>>>
>> >>>>> ```
>> >>>>>
>> >>>>>
>> >>>>> Normally went sent data at 10x a second I expect ~1 output metric
>> for
>> >>>>> every 100 metrics it receives, based on the 10 second window width.
>> >>>>>
>> >>>>> When fed data real time at that rate it seems to do just that.
>> >>>>>
>> >>>>> However when I either reprocess on an input topic with a large
>> amount
>> >> of
>> >>>>> data or feed data in significantly faster I see a very different
>> >>> behavior.
>> >>>>>
>> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>> >>> ingested
>> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
>> >>>>>
>> >>>>> Over the next minute the records output creeps to 1796, but then
>> holds
>> >>>>> steady and does not keep going up to the expected total of 14400.
>> >>>>>
>> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>> >> which
>> >>> is
>> >>>>> lower than the 1796 records I would have anticipated from the number
>> >> of
>> >>>>> calls into the final kstream map function.
>> >>>>>
>> >>>>> Precise number of emitted records will vary from one run to the
>> next.
>> >>>>>
>> >>>>> Where are the extra metrics going?  Is there some commit issue that
>> is
>> >>>>> causing dropped messages if the ktable producer isn't able to keep
>> up?
>> >>>>>
>> >>>>> Any recommendations on where to focus the investigation of the
>> issue?
>> >>>>>
>> >>>>> Running Kafka 0.10.2.1.
>> >>>>>
>> >>>>> Thanks,
>> >>>>>   Caleb
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>
>> >
>>
>>
>

Reply via email to