Another thing to consider? Do you have records will null key or value?
Those records would be dropped and not processes.

-Matthias

On 6/15/17 6:24 AM, Garrett Barton wrote:
> Is your time usage correct?  It sounds like you want event time not
> load/process time which is default unless you have a TimestampExtractor
> defined somewhere upstream?  Otherwise I could see far fewer events coming
> out as streams is just aggregating whatever showed up in that 10 second
> window.
> 
> On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote:
> 
>> Disabling the cache with:
>>
>> ```
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0)
>> ```
>>
>> Results in:
>> - Emitting many more intermediate calculations.
>> - Still losing data.
>>
>> In my test case it output 342476 intermediate calculations for 3414
>> distinct windows, 14400 distinct were expected.
>>
>> Regards,
>>   Caleb
>>
>> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> This seems to be related to internal KTable caches. You can disable them
>>> by setting cache size to zero.
>>>
>>> http://docs.confluent.io/current/streams/developer-
>>> guide.html#memory-management
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
>>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>>> THREADS_CONFIG=2`
>>>> or higher the problem shows up.
>>>>
>>>> When the number of threads is 1 the speed of data through the first
>> part
>>> of
>>>> the topology (before the ktable) slows down considerably, but it seems
>> to
>>>> slow down to the speed of the output which may be the key.
>>>>
>>>> That said... Changing the number of stream threads should not impact
>> data
>>>> correctness.  Seems like a bug someplace in kafka.
>>>>
>>>>
>>>>
>>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>>> wrote:
>>>>
>>>>> I have a topology of
>>>>>     KStream -> KTable -> KStream
>>>>>
>>>>> ```
>>>>>
>>>>> final KStreamBuilder builder = new KStreamBuilder();
>>>>> final KStream<String, Metric> metricStream =
>>> builder.stream(ingestTopic);
>>>>> final KTable<Windowed<String>, MyThing> myTable = metricStream
>>>>>         .groupByKey(stringSerde, mySerde)
>>>>>         .reduce(MyThing::merge,
>>>>>                 TimeWindows.of(10000).advanceBy(10000).until(
>>> Duration.ofDays(retentionDays).toMillis()),
>>>>>                 tableTopic);
>>>>>
>>>>> myTable.toStream()
>>>>>         .map((key, value) -> { return (KeyValue.pair(key.key(),
>>> value.finalize(key.window()))); })
>>>>>         .to(stringSerde, mySerde, sinkTopic);
>>>>>
>>>>> ```
>>>>>
>>>>>
>>>>> Normally went sent data at 10x a second I expect ~1 output metric for
>>>>> every 100 metrics it receives, based on the 10 second window width.
>>>>>
>>>>> When fed data real time at that rate it seems to do just that.
>>>>>
>>>>> However when I either reprocess on an input topic with a large amount
>> of
>>>>> data or feed data in significantly faster I see a very different
>>> behavior.
>>>>>
>>>>> Over the course of 20 seconds I can see 1,440,000 messages being
>>> ingested
>>>>> into the ktable, but only 633 emitted from it (expected 14400).
>>>>>
>>>>> Over the next minute the records output creeps to 1796, but then holds
>>>>> steady and does not keep going up to the expected total of 14400.
>>>>>
>>>>> A consumer reading from the sinkTopic ends up finding about 1264,
>> which
>>> is
>>>>> lower than the 1796 records I would have anticipated from the number
>> of
>>>>> calls into the final kstream map function.
>>>>>
>>>>> Precise number of emitted records will vary from one run to the next.
>>>>>
>>>>> Where are the extra metrics going?  Is there some commit issue that is
>>>>> causing dropped messages if the ktable producer isn't able to keep up?
>>>>>
>>>>> Any recommendations on where to focus the investigation of the issue?
>>>>>
>>>>> Running Kafka 0.10.2.1.
>>>>>
>>>>> Thanks,
>>>>>   Caleb
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to