Hi all, Bumping this up, in case some one has any ideas. I did yet another experiment where I create 4 producers and stripe the send requests across them in a manner such that any one producer only sees 256 partitions instead of the entire 1024. This seems to have helped a bit, and though I still see crazy high 99th (25-30 seconds), the median, mean, 75th and 95th percentile have all gone down.
Thanks! On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <tstump...@ntent.com> wrote: > Ah I thought it was restarting the broker that made things better :) > > Yeah I have no experience with the Java client so can't really help there. > > Good luck! > > -----Original Message----- > From: Rajiv Kurian [ra...@signalfuse.com] > Received: Sunday, 21 Dec 2014, 12:25PM > To: users@kafka.apache.org [users@kafka.apache.org] > Subject: Re: Trying to figure out kafka latency issues > > I'll take a look at the GC profile of the brokers Right now I keep a tab on > the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not > JVM heap) free disk space on the broker. I'll need to take a look at the > JVM metrics too. What seemed strange is that going from 8 -> 512 partitions > increases the latency, but going fro 512-> 8 does not decrease it. I have > to restart the producer (but not the broker) for the end to end latency to > go down That made it seem that the fault was probably with the producer > and not the broker. Only restarting the producer made things better. I'll > do more extensive measurement on the broker. > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <tstump...@ntent.com> > wrote: > > > > Did you see my response and have you checked the server logs especially > > the GC logs? It still sounds like you are running out of memory on the > > broker. What is your max heap memory and are you thrashing once you start > > writing to all those partitions? > > > > You have measured very thoroughly from an external point of view, i think > > now you'll have to start measuring the internal metrics. Maybe someone > else > > will have ideas on what jmx values to watch. > > > > Best, > > Thunder > > > > > > -----Original Message----- > > From: Rajiv Kurian [ra...@signalfuse.com] > > Received: Saturday, 20 Dec 2014, 10:24PM > > To: users@kafka.apache.org [users@kafka.apache.org] > > Subject: Re: Trying to figure out kafka latency issues > > > > Some more work tells me that the end to end latency numbers vary with the > > number of partitions I am writing to. I did an experiment, where based > on a > > run time flag I would dynamically select how many of the *1024 > partitions* > > I write to. So say I decide I'll write to at most 256 partitions I mod > > whatever partition I would actually write to by 256. Basically the number > > of partitions for this topic on the broker remains the same at *1024* > > partitions but the number of partitions my producers write to changes > > dynamically based on a run time flag. So something like this: > > > > int partition = getPartitionForMessage(message); > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This flag can > be > > updated without bringing the application down - just a volatile read of > > some number set externally. > > int moddedPartition = partition % maxPartitionsToWrite. > > // Send a message to this Kafka partition. > > > > Here are some interesting things I've noticed: > > > > i) When I start my client and it *never writes* to more than *8 > > partitions *(same > > data rate but fewer partitions) - the end to end *99th latency is 300-350 > > ms*. Quite a bit of this (numbers in my previous emails) is the latency > > from producer -> broker and the latency from broker -> consumer. Still > > nowhere as poor as the *20 - 30* seconds I was seeing. > > > > ii) When I increase the maximum number of partitions, end to end latency > > increases dramatically. At *256 partitions* the end to end *99th latency > is > > still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, > but > > not by much. When I increase this number to *512 partitions *the end > > to end *99th > > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the > > *99th > > latency is at 25 - 30 seconds*. > > A table of the numbers: > > > > Max number of partitions written to (out of 1024) > > > > End to end latency > > > > 8 > > > > 300 - 350 ms > > > > 256 > > > > 390 - 418 ms > > > > 512 > > > > 19 - 24 seconds > > > > 1024 > > > > 25 - 30 seconds > > > > > > iii) Once I make the max number of partitions high enough, reducing it > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the > latency > > goes up. But while the producer is running if I go down from *512* to > > *8 *partitions, > > it doesn't reduce the latency numbers. My guess is that the producer is > > creating some state lazily per partition and this state is causing the > > latency. Once this state is created, writing to fewer partitions doesn't > > seem to help. Only a restart of the producer calms things down. > > > > So my current plan is to reduce the number of partitions on the topic, > but > > there seems to be something deeper going on for the latency numbers to be > > so poor to begin with and then suffer so much more (non linearly) with > > additional partitions. > > > > Thanks! > > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > > I've done some more measurements. I've also started measuring the > latency > > > between when I ask my producer to send a message and when I get an > > > acknowledgement via the callback. Here is my code: > > > > > > // This function is called on every producer once every 30 seconds. > > > > > > public void addLagMarkers(final Histogram enqueueLag) { > > > > > > final int numberOfPartitions = 1024; > > > > > > final long timeOfEnqueue = System.currentTimeMillis(); > > > > > > final Callback callback = new Callback() { > > > > > > @Override > > > > > > public void onCompletion(RecordMetadata metadata, Exception > > ex) > > > { > > > > > > if (metadata != null) { > > > > > > // The difference between ack time from broker and > > > enqueue time. > > > > > > final long timeOfAck = System.currentTimeMillis(); > > > > > > final long lag = timeOfAck - timeOfEnqueue; > > > > > > enqueueLag.update(lag); > > > > > > } > > > > > > } > > > > > > }; > > > > > > for (int i = 0; i < numberOfPartitions; i++) { > > > > > > try { > > > > > > byte[] value = LagMarker.serialize(timeOfEnqueue); // > 10 > > > bytes -> short version + long timestamp. > > > > > > // This message is later used by the consumers to > measure > > > lag. > > > > > > ProducerRecord record = new ProducerRecord(MY_TOPIC, i, > > > null, value); > > > > > > kafkaProducer.send(record, callback); > > > > > > } catch (Exception e) { > > > > > > // We just dropped a lag marker. > > > > > > } > > > > > > } > > > > > > } > > > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not stellar, but > > > doesn't account for the *20-30 second 99th* I see on the end to end > lag. > > > I am consuming in a tight loop on the Consumers (using the > > SimpleConsumer) > > > with minimal processing with a *99th fetch time *of *130-140* ms, so I > > > don't think that should be a problem either. Completely baffled. > > > > > > > > > Thanks! > > > > > > > > > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <ra...@signalfuse.com> > > > wrote: > > >> > > >> > > >> > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com> > > >> wrote: > > >>> > > >>> I am trying to replace a Thrift peer to peer API with kafka for a > > >>> particular work flow. I am finding the 99th percentile latency to be > > >>> unacceptable at this time. This entire work load runs in an Amazon > > VPC. I'd > > >>> greatly appreciate it if some one has any insights on why I am seeing > > such > > >>> poor numbers. Here are some details and measurements taken: > > >>> > > >>> i) I have a single topic with 1024 partitions that I am writing to > from > > >>> six clients using the kafka 0.8.2 beta kafka producer. > > >>> > > >>> ii) I have 3 brokers, each on a c3 2x machine on ec2. Each of those > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The > > broker -> > > >>> partitions mapping was decided by Kafka when I created the topic. > > >>> > > >>> iii) I write about 22 thousand messages per second from across the 6 > > >>> clients. This number was calculated using distributed counters. I > just > > >>> increment a distributed counter in the callback from my enqueue job > if > > the > > >>> metadata returned is not null. I also increment a number o dropped > > messages > > >>> counter if the callback has a non-null exception or if there was an > > >>> exception in the synchronous send call. The number of dropped > messages > > is > > >>> pretty much always zero. Out of the 6 clients 3 are responsible for > > 95% of > > >>> the traffic. Messages are very tiny and have null keys and 27 byte > > values > > >>> (2 byte version and 25 byte payload). Again these messages are > written > > >>> using the kafka 0.8.2 client. > > >>> > > >>> iv) I have 3 consumer processes consuming only from this topic. Each > > >>> consumer process is assigned a disjoint set of the 1024 partitions by > > an > > >>> eternal arbiter. Each consumer process then creates a mapping from > > brokers > > >>> -> partitions it has been assigned. It then starts one fetcher thread > > per > > >>> broker. Each thread queries the broker (using the SimpleConsumer) it > > has > > >>> been assigned for partitions such that partitions = (partitions on > > broker) > > >>> *∩ *(partitions assigned to the process by the arbiter). So in effect > > >>> there are 9 consumer threads across 3 consumer processes that query a > > >>> disjoint set of partitions for the single topic and amongst > themselves > > they > > >>> manage to consume from every topic. So each thread has a run loop > > where it > > >>> asks for messages, consumes them then asks for more messages. When a > > run > > >>> loop starts, it starts by querying for the latest message in a > > partition > > >>> (i.e. discards any previous backup) and then maintains a map of > > partition > > >>> -> nextOffsetToRequest in memory to make sure that it consumes > > messages in > > >>> order. > > >>> > > >> Edit: Mean't external arbiter. > > >> > > >>> > > >>> v) Consumption is really simple. Each message is put on a non > blocking > > >>> efficient ring buffer. If the ring buffer is full the message is > > dropped. I > > >>> measure the mean time between fetches and it is the same as the time > > for > > >>> fetches up to a ms, meaning no matter how many messages are dequeued, > > it > > >>> takes almost no time to process them. At the end of processing a > fetch > > >>> request I increment another distributed counter that counts the > number > > of > > >>> messages processed. This counter tells me that on average I am > > consuming > > >>> the same number of messages/sec that I enqueue on the producers i.e. > > around > > >>> 22 thousand messages/sec. > > >>> > > >>> vi) The 99th percentile of the number of messages fetched per fetch > > >>> request is about 500 messages. The 99th percentile of the time it > > takes to > > >>> fetch a batch is abut 130 - 140 ms. I played around with the buffer > and > > >>> maxWait settings on the SimpleConsumer and attempting to consume more > > >>> messages was leading the 99th percentile of the fetch time to balloon > > up, > > >>> so I am consuming in smaller batches right now. > > >>> > > >>> vii) Every 30 seconds odd each of the producers inserts a trace > message > > >>> into each partition (so 1024 messages per producer every 30 seconds). > > Each > > >>> message only contains the System.currentTimeMillis() at the time of > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v) always > > >>> checks to see if a message it dequeued is of this trace type (instead > > of > > >>> the regular message type). This is a simple check on the first 2 byte > > >>> version that each message buffer contains. If it is a trace message > it > > >>> reads it's payload as a long and uses the difference between the > system > > >>> time on the consumer and this payload and updates a histogram with > this > > >>> difference value. Ignoring NTP skew (which I've measured to be in the > > order > > >>> of milliseconds) this is the lag between when a message was enqueued > > on the > > >>> producer and when it was read on the consumer. > > >>> > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long > for > > >> timestamp. > > >> > > >> So pseudocode for every consumer thread (one per broker per consumer > > >>> process (9 in total across 3 consumers) is: > > >>> > > >>> void run() { > > >>> > > >>> while (running) { > > >>> > > >>> FetchRequest fetchRequest = buildFetchRequest( > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker); // This > > >>> assignment is done by an external arbiter. > > >>> > > >>> measureTimeBetweenFetchRequests(); // The 99th of this is > > 130-140ms > > >>> > > >>> FetchResponse fetchResponse = fetchResponse(fetchRequest); // > The > > >>> 99th of this is 130-140ms, which is the same as the time between > > fetches. > > >>> > > >>> processData(fetchResponse); // The 99th on this is 2-3 ms. > > >>> } > > >>> } > > >>> > > >>> void processData(FetchResponse response) { > > >>> try (Timer.Context _ = processWorkerDataTimer.time() { // The 99th > > on > > >>> this is 2-3 ms. > > >>> for (Message message : response) { > > >>> if (typeOf(message) == NORMAL_DATA) { > > >>> enqueueOnNonBlockingRingBuffer(message); // Never blocks, > > >>> drops message if ring buffer is full. > > >>> } else if (typeOf(message) == TRACE_DATA) { > > >>> long timeOfEnqueue = geEnqueueTime(message); > > >>> long currentTime = System.currentTimeMillis(); > > >>> long difference = currentTime - timeOfEnqueue; > > >>> lagHistogram.update(difference); > > >>> } > > >>> } > > >>> } > > >>> } > > >>> > > >>> viii) So the kicker is that this lag is really really high: > > >>> Median Lag: *200 ms*. This already seems pretty high and I could even > > >>> discount some of it through NTP skew. > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median > > >>> kind of telling us how the distribution has a big tail. > > >>> 99th Lag: *20 seconds*!! > > >>> > > >>> I can't quite figure out the source of the lag. Here are some other > > >>> things of note: > > >>> > > >>> 1) The brokers in general are at about 40-50% CPU but I see > occasional > > >>> spikes to more than 80% - 95%. This could probably be causing issues. > > I am > > >>> wondering if this is down to the high number of partitions I have and > > how > > >>> each partition ends up receiving not too many messages. 22k messages > > spread > > >>> across 1024 partitions = only 21 odd messages/sec/partition. But each > > >>> broker should be receiving about 7500 messages/sec. It could also be > > down > > >>> to the nature of these messages being tiny. I imagine that the kafka > > wire > > >>> protocol is probably close to the 25 byte message payload. It also > has > > to > > >>> do a few CRC32 calculations etc. But again given that these are > C3.2x > > >>> machines and the number of messages is so tiny I'd expect it to do > > better. > > >>> But again reading this article > > >>> < > > > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > > > about > > >>> 2 million messages/sec across three brokers makes it seem like 22k 25 > > byte > > >>> messages should be very easy. So my biggest suspicion right now is > > that the > > >>> large number of partitions is some how responsible for this latency. > > >>> > > >>> 2) On the consumer given that the 99th time between fetches == 99th > > time > > >>> of a fetch, I don't see how I could do any better. It's pretty close > to > > >>> just getting batches of data, iterating through them and dropping > them > > on > > >>> the floor. My requests on the consumer are built like this: > > >>> > > >>> FetchRequestBuilder builder = new > > >>> FetchRequestBuilder().clientId(clientName) > > >>> > > >>> .maxWait(100).minBytes(1000); // max wait of 100 ms > or > > >>> at least 1000 bytes whichever happens first. > > >>> > > >>> // Add a fetch request for every partition. > > >>> > > >>> for (PartitionMetadata partition : partitions) { > > >>> > > >>> int partitionId = partition.partitionId(); > > >>> > > >>> long offset = readOffsets.get(partition); // This map > > >>> maintains the next partition to be read. > > >>> > > >>> builder.addFetch(MY_TOPIC, partitionId, offset, 3000); > // > > >>> Up to 3000 bytes per fetch request.} > > >>> } > > >>> 3) On the producer side I use the kafka beta producer. I've played > > >>> around with many settings but none of them seem to have made a > > difference. > > >>> Here are my current settings: > > >>> > > >>> ProducerConfig values: > > >>> > > >>> block.on.buffer.full = false > > >>> > > >>> retry.backoff.ms = 100 > > >>> > > >>> buffer.memory = 83886080 // Kind of big - could it be adding > > >>> lag? > > >>> > > >>> batch.size = 40500 // This is also kind of big - could it be > > >>> adding lag? > > >>> > > >>> metrics.sample.window.ms = 30000 > > >>> > > >>> metadata.max.age.ms = 300000 > > >>> > > >>> receive.buffer.bytes = 32768 > > >>> > > >>> timeout.ms = 30000 > > >>> > > >>> max.in.flight.requests.per.connection = 5 > > >>> > > >>> metric.reporters = [] > > >>> > > >>> bootstrap.servers = [broker1, broker2, broker3] > > >>> > > >>> client.id = > > >>> > > >>> compression.type = none > > >>> > > >>> retries = 0 > > >>> > > >>> max.request.size = 1048576 > > >>> > > >>> send.buffer.bytes = 131072 > > >>> > > >>> acks = 1 > > >>> > > >>> reconnect.backoff.ms = 10 > > >>> > > >>> linger.ms = 250 // Based on this config I'd imagine that > the > > >>> lag should be bounded to about 250 ms over all. > > >>> > > >>> metrics.num.samples = 2 > > >>> > > >>> metadata.fetch.timeout.ms = 60000 > > >>> > > >>> I know this is a lot of information. I just wanted to provide as much > > >>> context as possible. > > >>> > > >>> Thanks in advance! > > >>> > > >>> > > >