I cannot spot any obvious reasons.

As you consume from the result topic for verification, we should verify
that the latency spikes original on write and not on read: you might
want to have a look into Kafka Streams JMX metric to see if processing
latency spikes or throughput drops.

Also watch for GC pauses in the JVM.

Hope this helps.


-Matthias

On 8/17/18 12:13 PM, Nan Xu wrote:
> btw, I am using version 0.10.2.0
> 
> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote:
> 
>> I am working on a kafka stream app, and see huge latency variance,
>> wondering what can cause this?
>>
>> the processing is very simple and don't have state, linger.ms already
>> change to 5ms. the message size is around 10K byes and published as 2000
>> messages/s, network is 10G.  using a regular consumer watch the
>> localHistTopic  topic and just every 2000 message print out a counter,  it
>> usually every second I get a count 2000 as the publish speed, but sometime
>> I see it stall for 3 or more seconds and then print out a few count. like
>> cpu is paused during that time or message being cache/batch then processed.
>> any suggestion?
>>
>>   final Properties streamsConfiguration = new Properties();
>>
>>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> applicationId);
>>
>>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
>>
>>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> bootstrapServers);
>>
>>
>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>> Serdes.String()
>>
>>             .getClass().getName());
>>
>>         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> 10 * 1000);
>>
>> //
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.LINGER_MS_CONFIG,"5");
>>
>>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
>>             ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
>> 1024);MS_CONFIG, 10 * 1000);
>>
>> //
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>>
>>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>>
>>             + ProducerConfig.LINGER_MS_CONFIG,"5");
>>
>>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
>>
>>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 *
>> 1024);
>>
>>
>>  final StreamsBuilder builder = new StreamsBuilder();
>>
>>  final KStream<String, NodeMutation> localDeltaStream = builder.stream(
>>
>>             localDeltaTopic,
>>
>>             Consumed.with(
>>
>>                 new Serdes.StringSerde(),
>>
>>                 new NodeMutationSerde<>()
>>
>>             )
>>
>>         );
>>
>>   KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(
>>
>>             (mutation) -> NodeState
>>
>>                 .newBuilder()
>>
>>                 .setMeta(
>>
>>                     mutation.getMetaMutation().getMeta()
>>
>>                 )
>>
>>                 .setValue(
>>
>>                     mutation.getValueMutation().getValue()
>>
>>                 )
>>
>>                 .build()
>>
>>         );
>>
>>   localHistStream.to(
>>
>>             localHistTopic,
>>
>>             Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>>
>>         );
>>
>>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
>>
>>         streams.cleanUp();
>>
>> streams.start();
>>
>>
>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to