Thanks Jay.

I just used JMX to change the log level on the broker and checked the logs.
I am still not sure what line is exactly telling me how much time a
producer took to process a request. I see log lines of this format:

2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] Produce to local log
in 5 ms

Is this what I should be looking out for? I see producer requests of the
form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] Handling request:
Name: ProducerRequest; Version: 0; CorrelationId: 36308;  more stuff" but I
can't quite tell when this request was done with. So right now I see that
requests are logged frequently on the brokers (multiple times per second)
but since I don't know when they are finished with I can't tell the total
time.

Some other things that I found kind of odd are:

1) The producer requests seem to have an ack timeout of 30000 ms. I don't
think I set this on the producer. I don't know if this could have anything
to do with the latency problem.

2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] Handling request:
Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ;
RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff.

2) I see a bunch of small writes written to my [topic, partition] logs. My
messages are at a minimum 27 bytes, so maybe this is something else. Again
don't know if this is a problem:

2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
log MY.TOPIC-400 beginning at offset 19221923 and ending at offset 19221923

2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] 4 bytes written to
log MY.TOPIC-208 beginning at offset 29438019 and ending at offset 29438022

2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
log MY.TOPIC-163 beginning at offset 14821977 and ending at offset 14821977

2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
log MY.TOPIC118 beginning at offset 19321510 and ending at offset 19321510

2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
[kafka.server.KafkaApis              ]: [KafkaApi-11] 3 bytes written to
log MY.TOPIC-463 beginning at offset 28777627 and ending at offset 28777629

On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <j...@confluent.io> wrote:

> Hey Rajiv,
>
> Yes, if you uncomment the line
>   #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
> in the example log4j.properties file that will enable logging of each
> request including the time taken processing the request. This is the first
> step for diagnosing latency spikes since this will rule out the server. You
> may also want to enable DEBUG or TRACE on the producer and see what is
> happening when those spikes occur.
>
> The JMX stats show that at least the producer is not exhausting the I/O
> thread (it is 92% idle) and doesn't seem to be waiting on memory which is
> somewhat surprising to me (the NaN is an artifact of how we compute that
> stat--no allocations took place in the time period measured so it is kind
> of 0/0).
>
> -Jay
>
> On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > In case the attachments don't work out here is an imgur link -
> > http://imgur.com/NslGpT3,Uw6HFow#0
> >
> > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Never mind about (2). I see these stats are already being output by the
> > > kafka producer. I've attached a couple of screenshots (couldn't copy
> > paste
> > > from jvisualvm ). Do any of these things strike as odd? The
> > > bufferpool-wait-ratio sadly shows up as a NaN.
> > >
> > > I still don't know how to figure out (1).
> > >
> > > Thanks!
> > >
> > >
> > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > >> Hi Jay,
> > >>
> > >> Re (1) - I am not sure how to do this? Actually I am not sure what
> this
> > >> means. Is this the time every write/fetch request is received on the
> > >> broker? Do I need to enable some specific log level for this to show
> > up? It
> > >> doesn't show up in the usual log. Is this information also available
> via
> > >> jmx somehow?
> > >> Re (2) - Are you saying that I should instrument the "percentage of
> time
> > >> waiting for buffer space" stat myself? If so how do I do this. Or are
> > these
> > >> stats already output to jmx by the kafka producer code. This seems
> like
> > >> it's in the internals of the kafka producer client code.
> > >>
> > >> Thanks again!
> > >>
> > >>
> > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <ra...@signalfuse.com>
> > >> wrote:
> > >>
> > >>> Thanks Jay. Will check (1) and (2) and get back to you. The test is
> not
> > >>> stand-alone now. It might be a bit of work to extract it to a
> > stand-alone
> > >>> executable. It might take me a bit of time to get that going.
> > >>>
> > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote:
> > >>>
> > >>>> Hey Rajiv,
> > >>>>
> > >>>> This sounds like a bug. The more info you can help us get the easier
> > to
> > >>>> fix. Things that would help:
> > >>>> 1. Can you check if the the request log on the servers shows latency
> > >>>> spikes
> > >>>> (in which case it is a server problem)?
> > >>>> 2. It would be worth also getting the jmx stats on the producer as
> > they
> > >>>> will show things like what percentage of time it is waiting for
> buffer
> > >>>> space etc.
> > >>>>
> > >>>> If your test is reasonably stand-alone it would be great to file a
> > JIRA
> > >>>> and
> > >>>> attach the test code and the findings you already have so someone
> can
> > >>>> dig
> > >>>> into what is going on.
> > >>>>
> > >>>> -Jay
> > >>>>
> > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com
> >
> > >>>> wrote:
> > >>>>
> > >>>> > Hi all,
> > >>>> >
> > >>>> > Bumping this up, in case some one has any ideas. I did yet another
> > >>>> > experiment where I create 4 producers and stripe the send requests
> > >>>> across
> > >>>> > them in a manner such that any one producer only sees 256
> partitions
> > >>>> > instead of the entire 1024. This seems to have helped a bit, and
> > >>>> though I
> > >>>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th
> > and
> > >>>> 95th
> > >>>> > percentile have all gone down.
> > >>>> >
> > >>>> > Thanks!
> > >>>> >
> > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <
> > >>>> tstump...@ntent.com>
> > >>>> > wrote:
> > >>>> >
> > >>>> > > Ah I thought it was restarting the broker that made things
> better
> > :)
> > >>>> > >
> > >>>> > > Yeah I have no experience with the Java client so can't really
> > help
> > >>>> > there.
> > >>>> > >
> > >>>> > > Good luck!
> > >>>> > >
> > >>>> > > -----Original Message-----
> > >>>> > > From: Rajiv Kurian [ra...@signalfuse.com]
> > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM
> > >>>> > > To: users@kafka.apache.org [users@kafka.apache.org]
> > >>>> > > Subject: Re: Trying to figure out kafka latency issues
> > >>>> > >
> > >>>> > > I'll take a look at the GC profile of the brokers Right now I
> keep
> > >>>> a tab
> > >>>> > on
> > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the
> > >>>> machine
> > >>>> > not
> > >>>> > > JVM heap) free disk space on the broker. I'll need to take a
> look
> > >>>> at the
> > >>>> > > JVM metrics too. What seemed strange is that going from 8 -> 512
> > >>>> > partitions
> > >>>> > > increases the latency, but going fro 512-> 8 does not decrease
> it.
> > >>>> I have
> > >>>> > > to restart the producer (but not the broker) for the end to end
> > >>>> latency
> > >>>> > to
> > >>>> > > go down That made it seem  that the fault was probably with the
> > >>>> producer
> > >>>> > > and not the broker. Only restarting the producer made things
> > >>>> better. I'll
> > >>>> > > do more extensive measurement on the broker.
> > >>>> > >
> > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <
> > >>>> tstump...@ntent.com>
> > >>>> > > wrote:
> > >>>> > > >
> > >>>> > > > Did you see my response and have you checked the server logs
> > >>>> especially
> > >>>> > > > the GC logs? It still sounds like you are running out of
> memory
> > >>>> on the
> > >>>> > > > broker. What is your max heap memory and are you thrashing
> once
> > >>>> you
> > >>>> > start
> > >>>> > > > writing to all those partitions?
> > >>>> > > >
> > >>>> > > > You have measured very thoroughly from an external point of
> > view,
> > >>>> i
> > >>>> > think
> > >>>> > > > now you'll have to start measuring the internal metrics. Maybe
> > >>>> someone
> > >>>> > > else
> > >>>> > > > will have ideas on what jmx values to watch.
> > >>>> > > >
> > >>>> > > > Best,
> > >>>> > > > Thunder
> > >>>> > > >
> > >>>> > > >
> > >>>> > > > -----Original Message-----
> > >>>> > > > From: Rajiv Kurian [ra...@signalfuse.com]
> > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM
> > >>>> > > > To: users@kafka.apache.org [users@kafka.apache.org]
> > >>>> > > > Subject: Re: Trying to figure out kafka latency issues
> > >>>> > > >
> > >>>> > > > Some more work tells me that the end to end latency numbers
> vary
> > >>>> with
> > >>>> > the
> > >>>> > > > number of partitions I am writing to. I did an experiment,
> where
> > >>>> based
> > >>>> > > on a
> > >>>> > > > run time flag I would dynamically select how many of the *1024
> > >>>> > > partitions*
> > >>>> > > > I write to. So say I decide I'll write to at most 256
> partitions
> > >>>> I mod
> > >>>> > > > whatever partition I would actually write to by 256. Basically
> > the
> > >>>> > number
> > >>>> > > > of partitions for this topic on the broker remains the same at
> > >>>> *1024*
> > >>>> > > > partitions but the number of partitions my producers write to
> > >>>> changes
> > >>>> > > > dynamically based on a run time flag. So something like this:
> > >>>> > > >
> > >>>> > > > int partition = getPartitionForMessage(message);
> > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   //
> This
> > >>>> flag
> > >>>> > can
> > >>>> > > be
> > >>>> > > > updated without bringing the application down - just a
> volatile
> > >>>> read of
> > >>>> > > > some number set externally.
> > >>>> > > > int moddedPartition = partition % maxPartitionsToWrite.
> > >>>> > > > // Send a message to this Kafka partition.
> > >>>> > > >
> > >>>> > > > Here are some interesting things I've noticed:
> > >>>> > > >
> > >>>> > > > i) When I start my client and it *never writes* to more than
> *8
> > >>>> > > > partitions *(same
> > >>>> > > > data rate but fewer partitions) - the end to end *99th latency
> > is
> > >>>> > 300-350
> > >>>> > > > ms*. Quite a bit of this (numbers in my previous emails) is
> the
> > >>>> latency
> > >>>> > > > from producer -> broker and the latency from broker ->
> consumer.
> > >>>> Still
> > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
> > >>>> > > >
> > >>>> > > > ii) When I increase the maximum number of partitions, end to
> end
> > >>>> > latency
> > >>>> > > > increases dramatically. At *256 partitions* the end to end
> *99th
> > >>>> > latency
> > >>>> > > is
> > >>>> > > > still 390 - 418 ms.* Worse than the latency figures for *8
> > >>>> *partitions,
> > >>>> > > but
> > >>>> > > > not by much. When I increase this number to *512 partitions
> *the
> > >>>> end
> > >>>> > > > to end *99th
> > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024*
> > >>>> partitions
> > >>>> > the
> > >>>> > > > *99th
> > >>>> > > > latency is at 25 - 30 seconds*.
> > >>>> > > > A table of the numbers:
> > >>>> > > >
> > >>>> > > > Max number of partitions written to (out of 1024)
> > >>>> > > >
> > >>>> > > > End to end latency
> > >>>> > > >
> > >>>> > > > 8
> > >>>> > > >
> > >>>> > > > 300 - 350 ms
> > >>>> > > >
> > >>>> > > > 256
> > >>>> > > >
> > >>>> > > > 390 - 418 ms
> > >>>> > > >
> > >>>> > > > 512
> > >>>> > > >
> > >>>> > > > 19 - 24 seconds
> > >>>> > > >
> > >>>> > > > 1024
> > >>>> > > >
> > >>>> > > > 25 - 30 seconds
> > >>>> > > >
> > >>>> > > >
> > >>>> > > > iii) Once I make the max number of partitions high enough,
> > >>>> reducing it
> > >>>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions,
> > the
> > >>>> > > latency
> > >>>> > > > goes up. But while the producer is running if I go down from
> > >>>> *512* to
> > >>>> > > > *8 *partitions,
> > >>>> > > > it doesn't reduce the latency numbers. My guess is that the
> > >>>> producer is
> > >>>> > > > creating some state lazily per partition and this state is
> > >>>> causing the
> > >>>> > > > latency. Once this state is created, writing to fewer
> partitions
> > >>>> > doesn't
> > >>>> > > > seem to help. Only a restart of the producer calms things
> down.
> > >>>> > > >
> > >>>> > > > So my current plan is to reduce the number of partitions on
> the
> > >>>> topic,
> > >>>> > > but
> > >>>> > > > there seems to be something deeper going on for the latency
> > >>>> numbers to
> > >>>> > be
> > >>>> > > > so poor to begin with and then suffer so much more (non
> > linearly)
> > >>>> with
> > >>>> > > > additional partitions.
> > >>>> > > >
> > >>>> > > > Thanks!
> > >>>> > > >
> > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <
> > >>>> ra...@signalfuse.com>
> > >>>> > > > wrote:
> > >>>> > > > >
> > >>>> > > > > I've done some more measurements. I've also started
> measuring
> > >>>> the
> > >>>> > > latency
> > >>>> > > > > between when I ask my producer to send a message and when I
> > get
> > >>>> an
> > >>>> > > > > acknowledgement via the callback. Here is my code:
> > >>>> > > > >
> > >>>> > > > > // This function is called on every producer once every 30
> > >>>> seconds.
> > >>>> > > > >
> > >>>> > > > > public void addLagMarkers(final Histogram enqueueLag) {
> > >>>> > > > >
> > >>>> > > > >         final int numberOfPartitions = 1024;
> > >>>> > > > >
> > >>>> > > > >         final long timeOfEnqueue =
> System.currentTimeMillis();
> > >>>> > > > >
> > >>>> > > > >         final Callback callback = new Callback() {
> > >>>> > > > >
> > >>>> > > > >             @Override
> > >>>> > > > >
> > >>>> > > > >             public void onCompletion(RecordMetadata
> metadata,
> > >>>> > Exception
> > >>>> > > > ex)
> > >>>> > > > > {
> > >>>> > > > >
> > >>>> > > > >                 if (metadata != null) {
> > >>>> > > > >
> > >>>> > > > >                     // The difference between ack time from
> > >>>> broker
> > >>>> > and
> > >>>> > > > > enqueue time.
> > >>>> > > > >
> > >>>> > > > >                     final long timeOfAck =
> > >>>> > System.currentTimeMillis();
> > >>>> > > > >
> > >>>> > > > >                     final long lag = timeOfAck -
> > timeOfEnqueue;
> > >>>> > > > >
> > >>>> > > > >                     enqueueLag.update(lag);
> > >>>> > > > >
> > >>>> > > > >                 }
> > >>>> > > > >
> > >>>> > > > >             }
> > >>>> > > > >
> > >>>> > > > >         };
> > >>>> > > > >
> > >>>> > > > >         for (int i = 0; i < numberOfPartitions; i++) {
> > >>>> > > > >
> > >>>> > > > >             try {
> > >>>> > > > >
> > >>>> > > > >                 byte[] value =
> > >>>> LagMarker.serialize(timeOfEnqueue);
> > >>>> > //
> > >>>> > > 10
> > >>>> > > > > bytes -> short version + long timestamp.
> > >>>> > > > >
> > >>>> > > > >                 // This message is later used by the
> consumers
> > >>>> to
> > >>>> > > measure
> > >>>> > > > > lag.
> > >>>> > > > >
> > >>>> > > > >                 ProducerRecord record = new
> > >>>> ProducerRecord(MY_TOPIC,
> > >>>> > i,
> > >>>> > > > > null, value);
> > >>>> > > > >
> > >>>> > > > >                 kafkaProducer.send(record, callback);
> > >>>> > > > >
> > >>>> > > > >             } catch (Exception e) {
> > >>>> > > > >
> > >>>> > > > >                 // We just dropped a lag marker.
> > >>>> > > > >
> > >>>> > > > >             }
> > >>>> > > > >
> > >>>> > > > >         }
> > >>>> > > > >
> > >>>> > > > >     }
> > >>>> > > > >
> > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not
> > >>>> stellar, but
> > >>>> > > > > doesn't account for the *20-30 second 99th* I see on the end
> > to
> > >>>> end
> > >>>> > > lag.
> > >>>> > > > > I am consuming in a tight loop on the Consumers (using the
> > >>>> > > > SimpleConsumer)
> > >>>> > > > > with minimal processing with a *99th fetch time *of
> *130-140*
> > >>>> ms, so
> > >>>> > I
> > >>>> > > > > don't think that should be a problem either. Completely
> > baffled.
> > >>>> > > > >
> > >>>> > > > >
> > >>>> > > > > Thanks!
> > >>>> > > > >
> > >>>> > > > >
> > >>>> > > > >
> > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
> > >>>> ra...@signalfuse.com>
> > >>>> > > > > wrote:
> > >>>> > > > >>
> > >>>> > > > >>
> > >>>> > > > >>
> > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
> > >>>> ra...@signalfuse.com
> > >>>> > >
> > >>>> > > > >> wrote:
> > >>>> > > > >>>
> > >>>> > > > >>> I am trying to replace a Thrift peer to peer API with
> kafka
> > >>>> for a
> > >>>> > > > >>> particular work flow. I am finding the 99th percentile
> > >>>> latency to
> > >>>> > be
> > >>>> > > > >>> unacceptable at this time. This entire work load runs in
> an
> > >>>> Amazon
> > >>>> > > > VPC. I'd
> > >>>> > > > >>> greatly appreciate it if some one has any insights on why
> I
> > am
> > >>>> > seeing
> > >>>> > > > such
> > >>>> > > > >>> poor numbers. Here are some details and measurements
> taken:
> > >>>> > > > >>>
> > >>>> > > > >>> i) I have a single topic with 1024 partitions that I am
> > >>>> writing to
> > >>>> > > from
> > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer.
> > >>>> > > > >>>
> > >>>> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2.
> Each
> > of
> > >>>> > those
> > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB
> > SSDs.
> > >>>> The
> > >>>> > > > broker ->
> > >>>> > > > >>> partitions mapping was decided by Kafka when I created the
> > >>>> topic.
> > >>>> > > > >>>
> > >>>> > > > >>> iii) I write about 22 thousand messages per second from
> > >>>> across the
> > >>>> > 6
> > >>>> > > > >>> clients. This number was calculated using distributed
> > >>>> counters. I
> > >>>> > > just
> > >>>> > > > >>> increment a distributed counter in the callback from my
> > >>>> enqueue job
> > >>>> > > if
> > >>>> > > > the
> > >>>> > > > >>> metadata returned is not null. I also increment a number o
> > >>>> dropped
> > >>>> > > > messages
> > >>>> > > > >>> counter if the callback has a non-null exception or if
> there
> > >>>> was an
> > >>>> > > > >>> exception in the synchronous send call. The number of
> > dropped
> > >>>> > > messages
> > >>>> > > > is
> > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are
> > >>>> responsible for
> > >>>> > > > 95% of
> > >>>> > > > >>> the traffic. Messages are very tiny and have null keys
> and
> > >>>> 27 byte
> > >>>> > > > values
> > >>>> > > > >>> (2 byte version and 25 byte payload). Again these messages
> > are
> > >>>> > > written
> > >>>> > > > >>> using the kafka 0.8.2 client.
> > >>>> > > > >>>
> > >>>> > > > >>> iv) I have 3 consumer processes consuming only from this
> > >>>> topic.
> > >>>> > Each
> > >>>> > > > >>> consumer process is assigned a disjoint set of the 1024
> > >>>> partitions
> > >>>> > by
> > >>>> > > > an
> > >>>> > > > >>> eternal arbiter. Each consumer process then creates a
> > mapping
> > >>>> from
> > >>>> > > > brokers
> > >>>> > > > >>> -> partitions it has been assigned. It then starts one
> > fetcher
> > >>>> > thread
> > >>>> > > > per
> > >>>> > > > >>> broker. Each thread queries the broker (using the
> > >>>> SimpleConsumer)
> > >>>> > it
> > >>>> > > > has
> > >>>> > > > >>> been assigned for partitions such that partitions =
> > >>>> (partitions on
> > >>>> > > > broker)
> > >>>> > > > >>> *∩ *(partitions assigned to the process by the arbiter).
> So
> > in
> > >>>> > effect
> > >>>> > > > >>> there are 9 consumer threads across 3 consumer processes
> > that
> > >>>> > query a
> > >>>> > > > >>> disjoint set of partitions for the single topic and
> amongst
> > >>>> > > themselves
> > >>>> > > > they
> > >>>> > > > >>> manage to consume from every topic. So each thread has a
> run
> > >>>> loop
> > >>>> > > > where it
> > >>>> > > > >>> asks for messages, consumes them then asks for more
> > messages.
> > >>>> When
> > >>>> > a
> > >>>> > > > run
> > >>>> > > > >>> loop starts, it starts by querying for the latest message
> > in a
> > >>>> > > > partition
> > >>>> > > > >>> (i.e. discards any previous backup) and then maintains a
> map
> > >>>> of
> > >>>> > > > partition
> > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it
> > consumes
> > >>>> > > > messages in
> > >>>> > > > >>> order.
> > >>>> > > > >>>
> > >>>> > > > >> Edit: Mean't external arbiter.
> > >>>> > > > >>
> > >>>> > > > >>>
> > >>>> > > > >>> v) Consumption is really simple. Each message is put on a
> > non
> > >>>> > > blocking
> > >>>> > > > >>> efficient ring buffer. If the ring buffer is full the
> > message
> > >>>> is
> > >>>> > > > dropped. I
> > >>>> > > > >>> measure the mean time between fetches and it is the same
> as
> > >>>> the
> > >>>> > time
> > >>>> > > > for
> > >>>> > > > >>> fetches up to a ms, meaning no matter how many messages
> are
> > >>>> > dequeued,
> > >>>> > > > it
> > >>>> > > > >>> takes almost no time to process them. At the end of
> > >>>> processing a
> > >>>> > > fetch
> > >>>> > > > >>> request I increment another distributed counter that
> counts
> > >>>> the
> > >>>> > > number
> > >>>> > > > of
> > >>>> > > > >>> messages processed. This counter tells me that on average
> I
> > am
> > >>>> > > > consuming
> > >>>> > > > >>> the same number of messages/sec that I enqueue on the
> > >>>> producers
> > >>>> > i.e.
> > >>>> > > > around
> > >>>> > > > >>> 22 thousand messages/sec.
> > >>>> > > > >>>
> > >>>> > > > >>> vi) The 99th percentile of the number of messages fetched
> > per
> > >>>> fetch
> > >>>> > > > >>> request is about 500 messages.  The 99th percentile of the
> > >>>> time it
> > >>>> > > > takes to
> > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with
> the
> > >>>> buffer
> > >>>> > > and
> > >>>> > > > >>> maxWait settings on the SimpleConsumer and attempting to
> > >>>> consume
> > >>>> > more
> > >>>> > > > >>> messages was leading the 99th percentile of the fetch time
> > to
> > >>>> > balloon
> > >>>> > > > up,
> > >>>> > > > >>> so I am consuming in smaller batches right now.
> > >>>> > > > >>>
> > >>>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a
> > >>>> trace
> > >>>> > > message
> > >>>> > > > >>> into each partition (so 1024 messages per producer every
> 30
> > >>>> > seconds).
> > >>>> > > > Each
> > >>>> > > > >>> message only contains the System.currentTimeMillis() at
> the
> > >>>> time of
> > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step
> > (v)
> > >>>> > always
> > >>>> > > > >>> checks to see if a message it dequeued is of this trace
> type
> > >>>> > (instead
> > >>>> > > > of
> > >>>> > > > >>> the regular message type). This is a simple check on the
> > >>>> first 2
> > >>>> > byte
> > >>>> > > > >>> version that each message buffer contains. If it is a
> trace
> > >>>> message
> > >>>> > > it
> > >>>> > > > >>> reads it's payload as a long and uses the difference
> between
> > >>>> the
> > >>>> > > system
> > >>>> > > > >>> time on the consumer and this payload and updates a
> > histogram
> > >>>> with
> > >>>> > > this
> > >>>> > > > >>> difference value. Ignoring NTP skew (which I've measured
> to
> > >>>> be in
> > >>>> > the
> > >>>> > > > order
> > >>>> > > > >>> of milliseconds) this is the lag between when a message
> was
> > >>>> > enqueued
> > >>>> > > > on the
> > >>>> > > > >>> producer and when it was read on the consumer.
> > >>>> > > > >>>
> > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8
> > >>>> byte long
> > >>>> > > for
> > >>>> > > > >> timestamp.
> > >>>> > > > >>
> > >>>> > > > >> So pseudocode for every consumer thread (one per broker per
> > >>>> consumer
> > >>>> > > > >>> process (9 in total across 3 consumers) is:
> > >>>> > > > >>>
> > >>>> > > > >>> void run() {
> > >>>> > > > >>>
> > >>>> > > > >>> while (running) {
> > >>>> > > > >>>
> > >>>> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
> > >>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  //
> > This
> > >>>> > > > >>> assignment is done by an external arbiter.
> > >>>> > > > >>>
> > >>>> > > > >>>     measureTimeBetweenFetchRequests();   // The 99th of
> this
> > >>>> is
> > >>>> > > > 130-140ms
> > >>>> > > > >>>
> > >>>> > > > >>>     FetchResponse fetchResponse =
> > >>>> fetchResponse(fetchRequest);  //
> > >>>> > > The
> > >>>> > > > >>> 99th of this is 130-140ms, which is the same as the time
> > >>>> between
> > >>>> > > > fetches.
> > >>>> > > > >>>
> > >>>> > > > >>>     processData(fetchResponse);  // The 99th on this is
> 2-3
> > >>>> ms.
> > >>>> > > > >>>   }
> > >>>> > > > >>> }
> > >>>> > > > >>>
> > >>>> > > > >>> void processData(FetchResponse response) {
> > >>>> > > > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {
> //
> > >>>> The
> > >>>> > 99th
> > >>>> > > > on
> > >>>> > > > >>> this is 2-3 ms.
> > >>>> > > > >>>     for (Message message : response) {
> > >>>> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
> > >>>> > > > >>>          enqueueOnNonBlockingRingBuffer(message);  //
> Never
> > >>>> blocks,
> > >>>> > > > >>> drops message if ring buffer is full.
> > >>>> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
> > >>>> > > > >>>          long timeOfEnqueue = geEnqueueTime(message);
> > >>>> > > > >>>          long currentTime = System.currentTimeMillis();
> > >>>> > > > >>>          long difference = currentTime - timeOfEnqueue;
> > >>>> > > > >>>          lagHistogram.update(difference);
> > >>>> > > > >>>        }
> > >>>> > > > >>>      }
> > >>>> > > > >>>    }
> > >>>> > > > >>> }
> > >>>> > > > >>>
> > >>>> > > > >>> viii) So the kicker is that this lag is really really
> high:
> > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I
> > >>>> could
> > >>>> > even
> > >>>> > > > >>> discount some of it through NTP skew.
> > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger
> > than
> > >>>> > median
> > >>>> > > > >>> kind of telling us how the distribution has a big tail.
> > >>>> > > > >>> 99th Lag: *20 seconds*!!
> > >>>> > > > >>>
> > >>>> > > > >>> I can't quite figure out the source of the lag. Here are
> > some
> > >>>> other
> > >>>> > > > >>> things of note:
> > >>>> > > > >>>
> > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I
> see
> > >>>> > > occasional
> > >>>> > > > >>> spikes to more than 80% - 95%. This could probably be
> > causing
> > >>>> > issues.
> > >>>> > > > I am
> > >>>> > > > >>> wondering if this is down to the high number of
> partitions I
> > >>>> have
> > >>>> > and
> > >>>> > > > how
> > >>>> > > > >>> each partition ends up receiving not too many messages.
> 22k
> > >>>> > messages
> > >>>> > > > spread
> > >>>> > > > >>> across 1024 partitions = only 21 odd
> messages/sec/partition.
> > >>>> But
> > >>>> > each
> > >>>> > > > >>> broker should be receiving about  7500 messages/sec. It
> > could
> > >>>> also
> > >>>> > be
> > >>>> > > > down
> > >>>> > > > >>> to the nature of these messages being tiny. I imagine that
> > the
> > >>>> > kafka
> > >>>> > > > wire
> > >>>> > > > >>> protocol is probably close to the 25 byte message payload.
> > It
> > >>>> also
> > >>>> > > has
> > >>>> > > > to
> > >>>> > > > >>> do a few  CRC32 calculations etc. But again given that
> these
> > >>>> are
> > >>>> > > C3.2x
> > >>>> > > > >>> machines and the number of messages is so tiny I'd expect
> it
> > >>>> to do
> > >>>> > > > better.
> > >>>> > > > >>> But again reading this article
> > >>>> > > > >>> <
> > >>>> > > >
> > >>>> > >
> > >>>> >
> > >>>>
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > >>>> > > >
> > >>>> > > > about
> > >>>> > > > >>> 2 million messages/sec across three brokers makes it seem
> > >>>> like 22k
> > >>>> > 25
> > >>>> > > > byte
> > >>>> > > > >>> messages should be very easy. So my biggest suspicion
> right
> > >>>> now is
> > >>>> > > > that the
> > >>>> > > > >>> large number of partitions is some how responsible for
> this
> > >>>> > latency.
> > >>>> > > > >>>
> > >>>> > > > >>> 2) On the consumer given that the 99th time between
> fetches
> > >>>> == 99th
> > >>>> > > > time
> > >>>> > > > >>> of a fetch, I don't see how I could do any better. It's
> > pretty
> > >>>> > close
> > >>>> > > to
> > >>>> > > > >>> just getting batches of data, iterating through them and
> > >>>> dropping
> > >>>> > > them
> > >>>> > > > on
> > >>>> > > > >>> the floor. My requests on the consumer are built like
> this:
> > >>>> > > > >>>
> > >>>> > > > >>>         FetchRequestBuilder builder = new
> > >>>> > > > >>> FetchRequestBuilder().clientId(clientName)
> > >>>> > > > >>>
> > >>>> > > > >>>                 .maxWait(100).minBytes(1000);  // max wait
> > of
> > >>>> 100
> > >>>> > ms
> > >>>> > > or
> > >>>> > > > >>> at least 1000 bytes whichever happens first.
> > >>>> > > > >>>
> > >>>> > > > >>>         // Add a fetch request for every partition.
> > >>>> > > > >>>
> > >>>> > > > >>>         for (PartitionMetadata partition : partitions) {
> > >>>> > > > >>>
> > >>>> > > > >>>             int partitionId = partition.partitionId();
> > >>>> > > > >>>
> > >>>> > > > >>>             long offset = readOffsets.get(partition);  //
> > >>>> This map
> > >>>> > > > >>> maintains the next partition to be read.
> > >>>> > > > >>>
> > >>>> > > > >>>             builder.addFetch(MY_TOPIC, partitionId,
> offset,
> > >>>> 3000);
> > >>>> > > //
> > >>>> > > > >>> Up to 3000 bytes per fetch request.}
> > >>>> > > > >>>         }
> > >>>> > > > >>> 3) On the producer side I use the kafka beta producer.
> I've
> > >>>> played
> > >>>> > > > >>> around with many settings but none of them seem to have
> > made a
> > >>>> > > > difference.
> > >>>> > > > >>> Here are my current settings:
> > >>>> > > > >>>
> > >>>> > > > >>> ProducerConfig values:
> > >>>> > > > >>>
> > >>>> > > > >>>        block.on.buffer.full = false
> > >>>> > > > >>>
> > >>>> > > > >>>         retry.backoff.ms = 100
> > >>>> > > > >>>
> > >>>> > > > >>>         buffer.memory = 83886080  // Kind of big - could
> it
> > be
> > >>>> > adding
> > >>>> > > > >>> lag?
> > >>>> > > > >>>
> > >>>> > > > >>>         batch.size = 40500  // This is also kind of big -
> > >>>> could it
> > >>>> > be
> > >>>> > > > >>> adding lag?
> > >>>> > > > >>>
> > >>>> > > > >>>         metrics.sample.window.ms = 30000
> > >>>> > > > >>>
> > >>>> > > > >>>         metadata.max.age.ms = 300000
> > >>>> > > > >>>
> > >>>> > > > >>>         receive.buffer.bytes = 32768
> > >>>> > > > >>>
> > >>>> > > > >>>         timeout.ms = 30000
> > >>>> > > > >>>
> > >>>> > > > >>>         max.in.flight.requests.per.connection = 5
> > >>>> > > > >>>
> > >>>> > > > >>>         metric.reporters = []
> > >>>> > > > >>>
> > >>>> > > > >>>         bootstrap.servers = [broker1, broker2, broker3]
> > >>>> > > > >>>
> > >>>> > > > >>>         client.id =
> > >>>> > > > >>>
> > >>>> > > > >>>         compression.type = none
> > >>>> > > > >>>
> > >>>> > > > >>>         retries = 0
> > >>>> > > > >>>
> > >>>> > > > >>>         max.request.size = 1048576
> > >>>> > > > >>>
> > >>>> > > > >>>         send.buffer.bytes = 131072
> > >>>> > > > >>>
> > >>>> > > > >>>         acks = 1
> > >>>> > > > >>>
> > >>>> > > > >>>         reconnect.backoff.ms = 10
> > >>>> > > > >>>
> > >>>> > > > >>>         linger.ms = 250  // Based on this config I'd
> > imagine
> > >>>> that
> > >>>> > > the
> > >>>> > > > >>> lag should be bounded to about 250 ms over all.
> > >>>> > > > >>>
> > >>>> > > > >>>         metrics.num.samples = 2
> > >>>> > > > >>>
> > >>>> > > > >>>         metadata.fetch.timeout.ms = 60000
> > >>>> > > > >>>
> > >>>> > > > >>> I know this is a lot of information. I just wanted to
> > provide
> > >>>> as
> > >>>> > much
> > >>>> > > > >>> context as possible.
> > >>>> > > > >>>
> > >>>> > > > >>> Thanks in advance!
> > >>>> > > > >>>
> > >>>> > > > >>>
> > >>>> > > >
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
>

Reply via email to