thanks, which JMX properties indicate  "processing latency spikes"  /
"throughput"


On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I cannot spot any obvious reasons.
>
> As you consume from the result topic for verification, we should verify
> that the latency spikes original on write and not on read: you might
> want to have a look into Kafka Streams JMX metric to see if processing
> latency spikes or throughput drops.
>
> Also watch for GC pauses in the JVM.
>
> Hope this helps.
>
>
> -Matthias
>
> On 8/17/18 12:13 PM, Nan Xu wrote:
> > btw, I am using version 0.10.2.0
> >
> > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote:
> >
> >> I am working on a kafka stream app, and see huge latency variance,
> >> wondering what can cause this?
> >>
> >> the processing is very simple and don't have state, linger.ms already
> >> change to 5ms. the message size is around 10K byes and published as 2000
> >> messages/s, network is 10G.  using a regular consumer watch the
> >> localHistTopic  topic and just every 2000 message print out a counter,
> it
> >> usually every second I get a count 2000 as the publish speed, but
> sometime
> >> I see it stall for 3 or more seconds and then print out a few count.
> like
> >> cpu is paused during that time or message being cache/batch then
> processed.
> >> any suggestion?
> >>
> >>   final Properties streamsConfiguration = new Properties();
> >>
> >>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >> applicationId);
> >>
> >>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> clientId);
> >>
> >>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> bootstrapServers);
> >>
> >>
> >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> >> Serdes.String()
> >>
> >>             .getClass().getName());
> >>
> >>
>  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >> 10 * 1000);
> >>
> >> //
> >>
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>
> >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> >>             ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
> >> 1024);MS_CONFIG, 10 * 1000);
> >>
> >> //
> >>
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
> >>
> >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> >>
> >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> >>
> >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> >>
> >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 *
> 1024 *
> >> 1024);
> >>
> >>
> >>  final StreamsBuilder builder = new StreamsBuilder();
> >>
> >>  final KStream<String, NodeMutation> localDeltaStream = builder.stream(
> >>
> >>             localDeltaTopic,
> >>
> >>             Consumed.with(
> >>
> >>                 new Serdes.StringSerde(),
> >>
> >>                 new NodeMutationSerde<>()
> >>
> >>             )
> >>
> >>         );
> >>
> >>   KStream<String, NodeState> localHistStream =
> localDeltaStream.mapValues(
> >>
> >>             (mutation) -> NodeState
> >>
> >>                 .newBuilder()
> >>
> >>                 .setMeta(
> >>
> >>                     mutation.getMetaMutation().getMeta()
> >>
> >>                 )
> >>
> >>                 .setValue(
> >>
> >>                     mutation.getValueMutation().getValue()
> >>
> >>                 )
> >>
> >>                 .build()
> >>
> >>         );
> >>
> >>   localHistStream.to(
> >>
> >>             localHistTopic,
> >>
> >>             Produced.with(new Serdes.StringSerde(), new
> NodeStateSerde<>())
> >>
> >>         );
> >>
> >>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
> >>
> >>         streams.cleanUp();
> >>
> >> streams.start();
> >>
> >>
> >>
> >>
> >>
> >
>
>

Reply via email to