I cannot spot any obvious reasons. As you consume from the result topic for verification, we should verify that the latency spikes original on write and not on read: you might want to have a look into Kafka Streams JMX metric to see if processing latency spikes or throughput drops.
Also watch for GC pauses in the JVM. Hope this helps. -Matthias On 8/17/18 12:13 PM, Nan Xu wrote: > btw, I am using version 0.10.2.0 > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote: > >> I am working on a kafka stream app, and see huge latency variance, >> wondering what can cause this? >> >> the processing is very simple and don't have state, linger.ms already >> change to 5ms. the message size is around 10K byes and published as 2000 >> messages/s, network is 10G. using a regular consumer watch the >> localHistTopic topic and just every 2000 message print out a counter, it >> usually every second I get a count 2000 as the publish speed, but sometime >> I see it stall for 3 or more seconds and then print out a few count. like >> cpu is paused during that time or message being cache/batch then processed. >> any suggestion? >> >> final Properties streamsConfiguration = new Properties(); >> >> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >> applicationId); >> >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); >> >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >> bootstrapServers); >> >> >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, >> Serdes.String() >> >> .getClass().getName()); >> >> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >> 10 * 1000); >> >> // >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.LINGER_MS_CONFIG,"5"); >> >> streamsConfiguration.put(StreamsConfig.consumerPrefix( >> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 * >> 1024);MS_CONFIG, 10 * 1000); >> >> // >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); >> >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX >> >> + ProducerConfig.LINGER_MS_CONFIG,"5"); >> >> streamsConfiguration.put(StreamsConfig.consumerPrefix( >> >> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 * >> 1024); >> >> >> final StreamsBuilder builder = new StreamsBuilder(); >> >> final KStream<String, NodeMutation> localDeltaStream = builder.stream( >> >> localDeltaTopic, >> >> Consumed.with( >> >> new Serdes.StringSerde(), >> >> new NodeMutationSerde<>() >> >> ) >> >> ); >> >> KStream<String, NodeState> localHistStream = localDeltaStream.mapValues( >> >> (mutation) -> NodeState >> >> .newBuilder() >> >> .setMeta( >> >> mutation.getMetaMutation().getMeta() >> >> ) >> >> .setValue( >> >> mutation.getValueMutation().getValue() >> >> ) >> >> .build() >> >> ); >> >> localHistStream.to( >> >> localHistTopic, >> >> Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>()) >> >> ); >> >> streams = new KafkaStreams(builder.build(), streamsConfiguration); >> >> streams.cleanUp(); >> >> streams.start(); >> >> >> >> >> >
signature.asc
Description: OpenPGP digital signature