Okay, so you're measuring end-to-end time from producer -> broker ->
streams' consumer client, there are multiple phases that can contribute to
the 100ms latency, and I cannot tell if stream's consumer phase is the
major contributor. For example, if the topic was not created before, then
when the broker first received a produce request it may need to create the
topic, which involves multiple steps including writes to ZK which could
take time.

There are some confusions from your description: you mentioned "Kafka
cluster is already up and running", but I think you are referring to "Kafka
Streams application instances are already up and running", right? Since
only the latter has rebalance process, while the Kafak brokers do not
really have "rebalances" except balancing load by migrating partitions.

Guozhang



On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> wrote:

> right, so my kafka cluster is already up and running for a while, and I can
> see from the log all broker instance already change from rebalance to
> running.
>
> I did a another test.
> from producer, right before the message get send to the broker, I put a
> timestamp in the message. and from the consumer side which is after stream
> processing, I compare this timestamp with current time. I can see some
> message processing time is above 100ms on some real powerful hardware. and
> from my application gc, all the gc time is below 1ms, kafka gc only happen
> once and below 1ms too.
>
> very puzzled. is there any communication to zookeeper, if not get response,
> will cause the broker to pause? I don't think that's the case but at this
> time don't know what else can be suspected.
>
> Nan
>
> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Nan,
> >
> > Note that Streams may need some time to rebalance and assign tasks even
> if
> > you only starts with one instance.
> >
> > I'd suggest you register your state listener in Kafka Streams via
> > KafkaStreams#setStateListener, and your customized StateListener should
> > record when the state transits from REBALANCING to RUNNING since only
> after
> > that the streams client will start to process the first record.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> >
> > > thanks, which JMX properties indicate  "processing latency spikes"  /
> > > "throughput"
> > >
> > >
> > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > I cannot spot any obvious reasons.
> > > >
> > > > As you consume from the result topic for verification, we should
> verify
> > > > that the latency spikes original on write and not on read: you might
> > > > want to have a look into Kafka Streams JMX metric to see if
> processing
> > > > latency spikes or throughput drops.
> > > >
> > > > Also watch for GC pauses in the JVM.
> > > >
> > > > Hope this helps.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > > btw, I am using version 0.10.2.0
> > > > >
> > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com>
> wrote:
> > > > >
> > > > >> I am working on a kafka stream app, and see huge latency variance,
> > > > >> wondering what can cause this?
> > > > >>
> > > > >> the processing is very simple and don't have state, linger.ms
> > already
> > > > >> change to 5ms. the message size is around 10K byes and published
> as
> > > 2000
> > > > >> messages/s, network is 10G.  using a regular consumer watch the
> > > > >> localHistTopic  topic and just every 2000 message print out a
> > counter,
> > > > it
> > > > >> usually every second I get a count 2000 as the publish speed, but
> > > > sometime
> > > > >> I see it stall for 3 or more seconds and then print out a few
> count.
> > > > like
> > > > >> cpu is paused during that time or message being cache/batch then
> > > > processed.
> > > > >> any suggestion?
> > > > >>
> > > > >>   final Properties streamsConfiguration = new Properties();
> > > > >>
> > > > >>
> >  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > > >> applicationId);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> > > > clientId);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > > SERVERS_CONFIG,
> > > > >> bootstrapServers);
> > > > >>
> > > > >>
> > > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > > SERDE_CLASS_CONFIG,
> > > > >> Serdes.String()
> > > > >>
> > > > >>             .getClass().getName());
> > > > >>
> > > > >>
> > > >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > > > >> 10 * 1000);
> > > > >>
> > > > >> //
> > > > >>
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > 0);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> REQUESTS_PER_CONNECTION,
> > > 30);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> > > > >>             ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 *
> > > 1024 *
> > > > >> 1024);MS_CONFIG, 10 * 1000);
> > > > >>
> > > > >> //
> > > > >>
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > 0);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> REQUESTS_PER_CONNECTION,
> > > 30);
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > > > >>
> > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > >>
> > > > >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> > > > >>
> > > > >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG ,
> 20 *
> > > > 1024 *
> > > > >> 1024);
> > > > >>
> > > > >>
> > > > >>  final StreamsBuilder builder = new StreamsBuilder();
> > > > >>
> > > > >>  final KStream<String, NodeMutation> localDeltaStream =
> > > builder.stream(
> > > > >>
> > > > >>             localDeltaTopic,
> > > > >>
> > > > >>             Consumed.with(
> > > > >>
> > > > >>                 new Serdes.StringSerde(),
> > > > >>
> > > > >>                 new NodeMutationSerde<>()
> > > > >>
> > > > >>             )
> > > > >>
> > > > >>         );
> > > > >>
> > > > >>   KStream<String, NodeState> localHistStream =
> > > > localDeltaStream.mapValues(
> > > > >>
> > > > >>             (mutation) -> NodeState
> > > > >>
> > > > >>                 .newBuilder()
> > > > >>
> > > > >>                 .setMeta(
> > > > >>
> > > > >>                     mutation.getMetaMutation().getMeta()
> > > > >>
> > > > >>                 )
> > > > >>
> > > > >>                 .setValue(
> > > > >>
> > > > >>                     mutation.getValueMutation().getValue()
> > > > >>
> > > > >>                 )
> > > > >>
> > > > >>                 .build()
> > > > >>
> > > > >>         );
> > > > >>
> > > > >>   localHistStream.to(
> > > > >>
> > > > >>             localHistTopic,
> > > > >>
> > > > >>             Produced.with(new Serdes.StringSerde(), new
> > > > NodeStateSerde<>())
> > > > >>
> > > > >>         );
> > > > >>
> > > > >>  streams = new KafkaStreams(builder.build(),
> streamsConfiguration);
> > > > >>
> > > > >>         streams.cleanUp();
> > > > >>
> > > > >> streams.start();
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to