Hi all,

Bumping this up, in case some one has any ideas. I did yet another
experiment where I create 4 producers and stripe the send requests across
them in a manner such that any one producer only sees 256 partitions
instead of the entire 1024. This seems to have helped a bit, and though I
still see crazy high 99th (25-30 seconds), the median, mean, 75th and 95th
percentile have all gone down.

Thanks!

On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <tstump...@ntent.com>
wrote:

> Ah I thought it was restarting the broker that made things better :)
>
> Yeah I have no experience with the Java client so can't really help there.
>
> Good luck!
>
> -----Original Message-----
> From: Rajiv Kurian [ra...@signalfuse.com]
> Received: Sunday, 21 Dec 2014, 12:25PM
> To: users@kafka.apache.org [users@kafka.apache.org]
> Subject: Re: Trying to figure out kafka latency issues
>
> I'll take a look at the GC profile of the brokers Right now I keep a tab on
> the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not
> JVM heap) free disk space on the broker. I'll need to take a look at the
> JVM metrics too. What seemed strange is that going from 8 -> 512 partitions
> increases the latency, but going fro 512-> 8 does not decrease it. I have
> to restart the producer (but not the broker) for the end to end latency to
> go down That made it seem  that the fault was probably with the producer
> and not the broker. Only restarting the producer made things better. I'll
> do more extensive measurement on the broker.
>
> On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <tstump...@ntent.com>
> wrote:
> >
> > Did you see my response and have you checked the server logs especially
> > the GC logs? It still sounds like you are running out of memory on the
> > broker. What is your max heap memory and are you thrashing once you start
> > writing to all those partitions?
> >
> > You have measured very thoroughly from an external point of view, i think
> > now you'll have to start measuring the internal metrics. Maybe someone
> else
> > will have ideas on what jmx values to watch.
> >
> > Best,
> > Thunder
> >
> >
> > -----Original Message-----
> > From: Rajiv Kurian [ra...@signalfuse.com]
> > Received: Saturday, 20 Dec 2014, 10:24PM
> > To: users@kafka.apache.org [users@kafka.apache.org]
> > Subject: Re: Trying to figure out kafka latency issues
> >
> > Some more work tells me that the end to end latency numbers vary with the
> > number of partitions I am writing to. I did an experiment, where based
> on a
> > run time flag I would dynamically select how many of the *1024
> partitions*
> > I write to. So say I decide I'll write to at most 256 partitions I mod
> > whatever partition I would actually write to by 256. Basically the number
> > of partitions for this topic on the broker remains the same at *1024*
> > partitions but the number of partitions my producers write to changes
> > dynamically based on a run time flag. So something like this:
> >
> > int partition = getPartitionForMessage(message);
> > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag can
> be
> > updated without bringing the application down - just a volatile read of
> > some number set externally.
> > int moddedPartition = partition % maxPartitionsToWrite.
> > // Send a message to this Kafka partition.
> >
> > Here are some interesting things I've noticed:
> >
> > i) When I start my client and it *never writes* to more than *8
> > partitions *(same
> > data rate but fewer partitions) - the end to end *99th latency is 300-350
> > ms*. Quite a bit of this (numbers in my previous emails) is the latency
> > from producer -> broker and the latency from broker -> consumer. Still
> > nowhere as poor as the *20 - 30* seconds I was seeing.
> >
> > ii) When I increase the maximum number of partitions, end to end latency
> > increases dramatically. At *256 partitions* the end to end *99th latency
> is
> > still 390 - 418 ms.* Worse than the latency figures for *8 *partitions,
> but
> > not by much. When I increase this number to *512 partitions *the end
> > to end *99th
> > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the
> > *99th
> > latency is at 25 - 30 seconds*.
> > A table of the numbers:
> >
> > Max number of partitions written to (out of 1024)
> >
> > End to end latency
> >
> > 8
> >
> > 300 - 350 ms
> >
> > 256
> >
> > 390 - 418 ms
> >
> > 512
> >
> > 19 - 24 seconds
> >
> > 1024
> >
> > 25 - 30 seconds
> >
> >
> > iii) Once I make the max number of partitions high enough, reducing it
> > doesn't help. For eg: If I go up from *8* to *512 *partitions, the
> latency
> > goes up. But while the producer is running if I go down from  *512* to
> > *8 *partitions,
> > it doesn't reduce the latency numbers. My guess is that the producer is
> > creating some state lazily per partition and this state is causing the
> > latency. Once this state is created, writing to fewer partitions doesn't
> > seem to help. Only a restart of the producer calms things down.
> >
> > So my current plan is to reduce the number of partitions on the topic,
> but
> > there seems to be something deeper going on for the latency numbers to be
> > so poor to begin with and then suffer so much more (non linearly) with
> > additional partitions.
> >
> > Thanks!
> >
> > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> > >
> > > I've done some more measurements. I've also started measuring the
> latency
> > > between when I ask my producer to send a message and when I get an
> > > acknowledgement via the callback. Here is my code:
> > >
> > > // This function is called on every producer once every 30 seconds.
> > >
> > > public void addLagMarkers(final Histogram enqueueLag) {
> > >
> > >         final int numberOfPartitions = 1024;
> > >
> > >         final long timeOfEnqueue = System.currentTimeMillis();
> > >
> > >         final Callback callback = new Callback() {
> > >
> > >             @Override
> > >
> > >             public void onCompletion(RecordMetadata metadata, Exception
> > ex)
> > > {
> > >
> > >                 if (metadata != null) {
> > >
> > >                     // The difference between ack time from broker and
> > > enqueue time.
> > >
> > >                     final long timeOfAck = System.currentTimeMillis();
> > >
> > >                     final long lag = timeOfAck - timeOfEnqueue;
> > >
> > >                     enqueueLag.update(lag);
> > >
> > >                 }
> > >
> > >             }
> > >
> > >         };
> > >
> > >         for (int i = 0; i < numberOfPartitions; i++) {
> > >
> > >             try {
> > >
> > >                 byte[] value = LagMarker.serialize(timeOfEnqueue);  //
> 10
> > > bytes -> short version + long timestamp.
> > >
> > >                 // This message is later used by the consumers to
> measure
> > > lag.
> > >
> > >                 ProducerRecord record = new ProducerRecord(MY_TOPIC, i,
> > > null, value);
> > >
> > >                 kafkaProducer.send(record, callback);
> > >
> > >             } catch (Exception e) {
> > >
> > >                 // We just dropped a lag marker.
> > >
> > >             }
> > >
> > >         }
> > >
> > >     }
> > >
> > > The* 99th* on this lag is about* 350 - 400* ms. It's not stellar, but
> > > doesn't account for the *20-30 second 99th* I see on the end to end
> lag.
> > > I am consuming in a tight loop on the Consumers (using the
> > SimpleConsumer)
> > > with minimal processing with a *99th fetch time *of *130-140* ms, so I
> > > don't think that should be a problem either. Completely baffled.
> > >
> > >
> > > Thanks!
> > >
> > >
> > >
> > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >>
> > >>
> > >>
> > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com>
> > >> wrote:
> > >>>
> > >>> I am trying to replace a Thrift peer to peer API with kafka for a
> > >>> particular work flow. I am finding the 99th percentile latency to be
> > >>> unacceptable at this time. This entire work load runs in an Amazon
> > VPC. I'd
> > >>> greatly appreciate it if some one has any insights on why I am seeing
> > such
> > >>> poor numbers. Here are some details and measurements taken:
> > >>>
> > >>> i) I have a single topic with 1024 partitions that I am writing to
> from
> > >>> six clients using the kafka 0.8.2 beta kafka producer.
> > >>>
> > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of those
> > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The
> > broker ->
> > >>> partitions mapping was decided by Kafka when I created the topic.
> > >>>
> > >>> iii) I write about 22 thousand messages per second from across the 6
> > >>> clients. This number was calculated using distributed counters. I
> just
> > >>> increment a distributed counter in the callback from my enqueue job
> if
> > the
> > >>> metadata returned is not null. I also increment a number o dropped
> > messages
> > >>> counter if the callback has a non-null exception or if there was an
> > >>> exception in the synchronous send call. The number of dropped
> messages
> > is
> > >>> pretty much always zero. Out of the 6 clients 3 are responsible for
> > 95% of
> > >>> the traffic. Messages are very tiny and have null keys  and 27 byte
> > values
> > >>> (2 byte version and 25 byte payload). Again these messages are
> written
> > >>> using the kafka 0.8.2 client.
> > >>>
> > >>> iv) I have 3 consumer processes consuming only from this topic. Each
> > >>> consumer process is assigned a disjoint set of the 1024 partitions by
> > an
> > >>> eternal arbiter. Each consumer process then creates a mapping from
> > brokers
> > >>> -> partitions it has been assigned. It then starts one fetcher thread
> > per
> > >>> broker. Each thread queries the broker (using the SimpleConsumer) it
> > has
> > >>> been assigned for partitions such that partitions = (partitions on
> > broker)
> > >>> *∩ *(partitions assigned to the process by the arbiter). So in effect
> > >>> there are 9 consumer threads across 3 consumer processes that query a
> > >>> disjoint set of partitions for the single topic and amongst
> themselves
> > they
> > >>> manage to consume from every topic. So each thread has a run loop
> > where it
> > >>> asks for messages, consumes them then asks for more messages. When a
> > run
> > >>> loop starts, it starts by querying for the latest message in a
> > partition
> > >>> (i.e. discards any previous backup) and then maintains a map of
> > partition
> > >>> -> nextOffsetToRequest in memory to make sure that it consumes
> > messages in
> > >>> order.
> > >>>
> > >> Edit: Mean't external arbiter.
> > >>
> > >>>
> > >>> v) Consumption is really simple. Each message is put on a non
> blocking
> > >>> efficient ring buffer. If the ring buffer is full the message is
> > dropped. I
> > >>> measure the mean time between fetches and it is the same as the time
> > for
> > >>> fetches up to a ms, meaning no matter how many messages are dequeued,
> > it
> > >>> takes almost no time to process them. At the end of processing a
> fetch
> > >>> request I increment another distributed counter that counts the
> number
> > of
> > >>> messages processed. This counter tells me that on average I am
> > consuming
> > >>> the same number of messages/sec that I enqueue on the producers i.e.
> > around
> > >>> 22 thousand messages/sec.
> > >>>
> > >>> vi) The 99th percentile of the number of messages fetched per fetch
> > >>> request is about 500 messages.  The 99th percentile of the time it
> > takes to
> > >>> fetch a batch is abut 130 - 140 ms. I played around with the buffer
> and
> > >>> maxWait settings on the SimpleConsumer and attempting to consume more
> > >>> messages was leading the 99th percentile of the fetch time to balloon
> > up,
> > >>> so I am consuming in smaller batches right now.
> > >>>
> > >>> vii) Every 30 seconds odd each of the producers inserts a trace
> message
> > >>> into each partition (so 1024 messages per producer every 30 seconds).
> > Each
> > >>> message only contains the System.currentTimeMillis() at the time of
> > >>> enqueueing. It is about 9 bytes long. The consumer in step (v) always
> > >>> checks to see if a message it dequeued is of this trace type (instead
> > of
> > >>> the regular message type). This is a simple check on the first 2 byte
> > >>> version that each message buffer contains. If it is a trace message
> it
> > >>> reads it's payload as a long and uses the difference between the
> system
> > >>> time on the consumer and this payload and updates a histogram with
> this
> > >>> difference value. Ignoring NTP skew (which I've measured to be in the
> > order
> > >>> of milliseconds) this is the lag between when a message was enqueued
> > on the
> > >>> producer and when it was read on the consumer.
> > >>>
> > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long
> for
> > >> timestamp.
> > >>
> > >> So pseudocode for every consumer thread (one per broker per consumer
> > >>> process (9 in total across 3 consumers) is:
> > >>>
> > >>> void run() {
> > >>>
> > >>> while (running) {
> > >>>
> > >>>     FetchRequest fetchRequest = buildFetchRequest(
> > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
> > >>> assignment is done by an external arbiter.
> > >>>
> > >>>     measureTimeBetweenFetchRequests();   // The 99th of this is
> > 130-140ms
> > >>>
> > >>>     FetchResponse fetchResponse = fetchResponse(fetchRequest);  //
> The
> > >>> 99th of this is 130-140ms, which is the same as the time between
> > fetches.
> > >>>
> > >>>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
> > >>>   }
> > >>> }
> > >>>
> > >>> void processData(FetchResponse response) {
> > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {  // The 99th
> > on
> > >>> this is 2-3 ms.
> > >>>     for (Message message : response) {
> > >>>        if (typeOf(message) == NORMAL_DATA) {
> > >>>          enqueueOnNonBlockingRingBuffer(message);  // Never blocks,
> > >>> drops message if ring buffer is full.
> > >>>        } else if (typeOf(message) == TRACE_DATA) {
> > >>>          long timeOfEnqueue = geEnqueueTime(message);
> > >>>          long currentTime = System.currentTimeMillis();
> > >>>          long difference = currentTime - timeOfEnqueue;
> > >>>          lagHistogram.update(difference);
> > >>>        }
> > >>>      }
> > >>>    }
> > >>> }
> > >>>
> > >>> viii) So the kicker is that this lag is really really high:
> > >>> Median Lag: *200 ms*. This already seems pretty high and I could even
> > >>> discount some of it through NTP skew.
> > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median
> > >>> kind of telling us how the distribution has a big tail.
> > >>> 99th Lag: *20 seconds*!!
> > >>>
> > >>> I can't quite figure out the source of the lag. Here are some other
> > >>> things of note:
> > >>>
> > >>> 1) The brokers in general are at about 40-50% CPU but I see
> occasional
> > >>> spikes to more than 80% - 95%. This could probably be causing issues.
> > I am
> > >>> wondering if this is down to the high number of partitions I have and
> > how
> > >>> each partition ends up receiving not too many messages. 22k messages
> > spread
> > >>> across 1024 partitions = only 21 odd messages/sec/partition. But each
> > >>> broker should be receiving about  7500 messages/sec. It could also be
> > down
> > >>> to the nature of these messages being tiny. I imagine that the kafka
> > wire
> > >>> protocol is probably close to the 25 byte message payload. It also
> has
> > to
> > >>> do a few  CRC32 calculations etc. But again given that these are
> C3.2x
> > >>> machines and the number of messages is so tiny I'd expect it to do
> > better.
> > >>> But again reading this article
> > >>> <
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> >
> > about
> > >>> 2 million messages/sec across three brokers makes it seem like 22k 25
> > byte
> > >>> messages should be very easy. So my biggest suspicion right now is
> > that the
> > >>> large number of partitions is some how responsible for this latency.
> > >>>
> > >>> 2) On the consumer given that the 99th time between fetches == 99th
> > time
> > >>> of a fetch, I don't see how I could do any better. It's pretty close
> to
> > >>> just getting batches of data, iterating through them and dropping
> them
> > on
> > >>> the floor. My requests on the consumer are built like this:
> > >>>
> > >>>         FetchRequestBuilder builder = new
> > >>> FetchRequestBuilder().clientId(clientName)
> > >>>
> > >>>                 .maxWait(100).minBytes(1000);  // max wait of 100 ms
> or
> > >>> at least 1000 bytes whichever happens first.
> > >>>
> > >>>         // Add a fetch request for every partition.
> > >>>
> > >>>         for (PartitionMetadata partition : partitions) {
> > >>>
> > >>>             int partitionId = partition.partitionId();
> > >>>
> > >>>             long offset = readOffsets.get(partition);  // This map
> > >>> maintains the next partition to be read.
> > >>>
> > >>>             builder.addFetch(MY_TOPIC, partitionId, offset, 3000);
> //
> > >>> Up to 3000 bytes per fetch request.}
> > >>>         }
> > >>> 3) On the producer side I use the kafka beta producer. I've played
> > >>> around with many settings but none of them seem to have made a
> > difference.
> > >>> Here are my current settings:
> > >>>
> > >>> ProducerConfig values:
> > >>>
> > >>>        block.on.buffer.full = false
> > >>>
> > >>>         retry.backoff.ms = 100
> > >>>
> > >>>         buffer.memory = 83886080  // Kind of big - could it be adding
> > >>> lag?
> > >>>
> > >>>         batch.size = 40500  // This is also kind of big - could it be
> > >>> adding lag?
> > >>>
> > >>>         metrics.sample.window.ms = 30000
> > >>>
> > >>>         metadata.max.age.ms = 300000
> > >>>
> > >>>         receive.buffer.bytes = 32768
> > >>>
> > >>>         timeout.ms = 30000
> > >>>
> > >>>         max.in.flight.requests.per.connection = 5
> > >>>
> > >>>         metric.reporters = []
> > >>>
> > >>>         bootstrap.servers = [broker1, broker2, broker3]
> > >>>
> > >>>         client.id =
> > >>>
> > >>>         compression.type = none
> > >>>
> > >>>         retries = 0
> > >>>
> > >>>         max.request.size = 1048576
> > >>>
> > >>>         send.buffer.bytes = 131072
> > >>>
> > >>>         acks = 1
> > >>>
> > >>>         reconnect.backoff.ms = 10
> > >>>
> > >>>         linger.ms = 250  // Based on this config I'd imagine that
> the
> > >>> lag should be bounded to about 250 ms over all.
> > >>>
> > >>>         metrics.num.samples = 2
> > >>>
> > >>>         metadata.fetch.timeout.ms = 60000
> > >>>
> > >>> I know this is a lot of information. I just wanted to provide as much
> > >>> context as possible.
> > >>>
> > >>> Thanks in advance!
> > >>>
> > >>>
> >
>

Reply via email to