thanks, which JMX properties indicate "processing latency spikes" / "throughput"
On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io> wrote: > I cannot spot any obvious reasons. > > As you consume from the result topic for verification, we should verify > that the latency spikes original on write and not on read: you might > want to have a look into Kafka Streams JMX metric to see if processing > latency spikes or throughput drops. > > Also watch for GC pauses in the JVM. > > Hope this helps. > > > -Matthias > > On 8/17/18 12:13 PM, Nan Xu wrote: > > btw, I am using version 0.10.2.0 > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote: > > > >> I am working on a kafka stream app, and see huge latency variance, > >> wondering what can cause this? > >> > >> the processing is very simple and don't have state, linger.ms already > >> change to 5ms. the message size is around 10K byes and published as 2000 > >> messages/s, network is 10G. using a regular consumer watch the > >> localHistTopic topic and just every 2000 message print out a counter, > it > >> usually every second I get a count 2000 as the publish speed, but > sometime > >> I see it stall for 3 or more seconds and then print out a few count. > like > >> cpu is paused during that time or message being cache/batch then > processed. > >> any suggestion? > >> > >> final Properties streamsConfiguration = new Properties(); > >> > >> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > >> applicationId); > >> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, > clientId); > >> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > >> bootstrapServers); > >> > >> > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > >> Serdes.String() > >> > >> .getClass().getName()); > >> > >> > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > >> 10 * 1000); > >> > >> // > >> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.LINGER_MS_CONFIG,"5"); > >> > >> streamsConfiguration.put(StreamsConfig.consumerPrefix( > >> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 * > >> 1024);MS_CONFIG, 10 * 1000); > >> > >> // > >> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); > >> > >> streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > >> > >> + ProducerConfig.LINGER_MS_CONFIG,"5"); > >> > >> streamsConfiguration.put(StreamsConfig.consumerPrefix( > >> > >> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * > 1024 * > >> 1024); > >> > >> > >> final StreamsBuilder builder = new StreamsBuilder(); > >> > >> final KStream<String, NodeMutation> localDeltaStream = builder.stream( > >> > >> localDeltaTopic, > >> > >> Consumed.with( > >> > >> new Serdes.StringSerde(), > >> > >> new NodeMutationSerde<>() > >> > >> ) > >> > >> ); > >> > >> KStream<String, NodeState> localHistStream = > localDeltaStream.mapValues( > >> > >> (mutation) -> NodeState > >> > >> .newBuilder() > >> > >> .setMeta( > >> > >> mutation.getMetaMutation().getMeta() > >> > >> ) > >> > >> .setValue( > >> > >> mutation.getValueMutation().getValue() > >> > >> ) > >> > >> .build() > >> > >> ); > >> > >> localHistStream.to( > >> > >> localHistTopic, > >> > >> Produced.with(new Serdes.StringSerde(), new > NodeStateSerde<>()) > >> > >> ); > >> > >> streams = new KafkaStreams(builder.build(), streamsConfiguration); > >> > >> streams.cleanUp(); > >> > >> streams.start(); > >> > >> > >> > >> > >> > > > >