Thanks Jay. I just used JMX to change the log level on the broker and checked the logs. I am still not sure what line is exactly telling me how much time a producer took to process a request. I see log lines of this format:
2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0 ] [kafka.server.KafkaApis ]: [KafkaApi-11] Produce to local log in 5 ms Is this what I should be looking out for? I see producer requests of the form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 ] [kafka.server.KafkaApis ]: [KafkaApi-11] Handling request: Name: ProducerRequest; Version: 0; CorrelationId: 36308; more stuff" but I can't quite tell when this request was done with. So right now I see that requests are logged frequently on the brokers (multiple times per second) but since I don't know when they are finished with I can't tell the total time. Some other things that I found kind of odd are: 1) The producer requests seem to have an ack timeout of 30000 ms. I don't think I set this on the producer. I don't know if this could have anything to do with the latency problem. 2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 ] [kafka.server.KafkaApis ]: [KafkaApi-11] Handling request: Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ; RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff. 2) I see a bunch of small writes written to my [topic, partition] logs. My messages are at a minimum 27 bytes, so maybe this is something else. Again don't know if this is a problem: 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to log MY.TOPIC-400 beginning at offset 19221923 and ending at offset 19221923 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] [kafka.server.KafkaApis ]: [KafkaApi-11] 4 bytes written to log MY.TOPIC-208 beginning at offset 29438019 and ending at offset 29438022 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to log MY.TOPIC-163 beginning at offset 14821977 and ending at offset 14821977 2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1 ] [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to log MY.TOPIC118 beginning at offset 19321510 and ending at offset 19321510 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] [kafka.server.KafkaApis ]: [KafkaApi-11] 3 bytes written to log MY.TOPIC-463 beginning at offset 28777627 and ending at offset 28777629 On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <j...@confluent.io> wrote: > Hey Rajiv, > > Yes, if you uncomment the line > #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender > in the example log4j.properties file that will enable logging of each > request including the time taken processing the request. This is the first > step for diagnosing latency spikes since this will rule out the server. You > may also want to enable DEBUG or TRACE on the producer and see what is > happening when those spikes occur. > > The JMX stats show that at least the producer is not exhausting the I/O > thread (it is 92% idle) and doesn't seem to be waiting on memory which is > somewhat surprising to me (the NaN is an artifact of how we compute that > stat--no allocations took place in the time period measured so it is kind > of 0/0). > > -Jay > > On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > In case the attachments don't work out here is an imgur link - > > http://imgur.com/NslGpT3,Uw6HFow#0 > > > > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > Never mind about (2). I see these stats are already being output by the > > > kafka producer. I've attached a couple of screenshots (couldn't copy > > paste > > > from jvisualvm ). Do any of these things strike as odd? The > > > bufferpool-wait-ratio sadly shows up as a NaN. > > > > > > I still don't know how to figure out (1). > > > > > > Thanks! > > > > > > > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <ra...@signalfuse.com> > > > wrote: > > > > > >> Hi Jay, > > >> > > >> Re (1) - I am not sure how to do this? Actually I am not sure what > this > > >> means. Is this the time every write/fetch request is received on the > > >> broker? Do I need to enable some specific log level for this to show > > up? It > > >> doesn't show up in the usual log. Is this information also available > via > > >> jmx somehow? > > >> Re (2) - Are you saying that I should instrument the "percentage of > time > > >> waiting for buffer space" stat myself? If so how do I do this. Or are > > these > > >> stats already output to jmx by the kafka producer code. This seems > like > > >> it's in the internals of the kafka producer client code. > > >> > > >> Thanks again! > > >> > > >> > > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <ra...@signalfuse.com> > > >> wrote: > > >> > > >>> Thanks Jay. Will check (1) and (2) and get back to you. The test is > not > > >>> stand-alone now. It might be a bit of work to extract it to a > > stand-alone > > >>> executable. It might take me a bit of time to get that going. > > >>> > > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote: > > >>> > > >>>> Hey Rajiv, > > >>>> > > >>>> This sounds like a bug. The more info you can help us get the easier > > to > > >>>> fix. Things that would help: > > >>>> 1. Can you check if the the request log on the servers shows latency > > >>>> spikes > > >>>> (in which case it is a server problem)? > > >>>> 2. It would be worth also getting the jmx stats on the producer as > > they > > >>>> will show things like what percentage of time it is waiting for > buffer > > >>>> space etc. > > >>>> > > >>>> If your test is reasonably stand-alone it would be great to file a > > JIRA > > >>>> and > > >>>> attach the test code and the findings you already have so someone > can > > >>>> dig > > >>>> into what is going on. > > >>>> > > >>>> -Jay > > >>>> > > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com > > > > >>>> wrote: > > >>>> > > >>>> > Hi all, > > >>>> > > > >>>> > Bumping this up, in case some one has any ideas. I did yet another > > >>>> > experiment where I create 4 producers and stripe the send requests > > >>>> across > > >>>> > them in a manner such that any one producer only sees 256 > partitions > > >>>> > instead of the entire 1024. This seems to have helped a bit, and > > >>>> though I > > >>>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th > > and > > >>>> 95th > > >>>> > percentile have all gone down. > > >>>> > > > >>>> > Thanks! > > >>>> > > > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges < > > >>>> tstump...@ntent.com> > > >>>> > wrote: > > >>>> > > > >>>> > > Ah I thought it was restarting the broker that made things > better > > :) > > >>>> > > > > >>>> > > Yeah I have no experience with the Java client so can't really > > help > > >>>> > there. > > >>>> > > > > >>>> > > Good luck! > > >>>> > > > > >>>> > > -----Original Message----- > > >>>> > > From: Rajiv Kurian [ra...@signalfuse.com] > > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM > > >>>> > > To: users@kafka.apache.org [users@kafka.apache.org] > > >>>> > > Subject: Re: Trying to figure out kafka latency issues > > >>>> > > > > >>>> > > I'll take a look at the GC profile of the brokers Right now I > keep > > >>>> a tab > > >>>> > on > > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the > > >>>> machine > > >>>> > not > > >>>> > > JVM heap) free disk space on the broker. I'll need to take a > look > > >>>> at the > > >>>> > > JVM metrics too. What seemed strange is that going from 8 -> 512 > > >>>> > partitions > > >>>> > > increases the latency, but going fro 512-> 8 does not decrease > it. > > >>>> I have > > >>>> > > to restart the producer (but not the broker) for the end to end > > >>>> latency > > >>>> > to > > >>>> > > go down That made it seem that the fault was probably with the > > >>>> producer > > >>>> > > and not the broker. Only restarting the producer made things > > >>>> better. I'll > > >>>> > > do more extensive measurement on the broker. > > >>>> > > > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges < > > >>>> tstump...@ntent.com> > > >>>> > > wrote: > > >>>> > > > > > >>>> > > > Did you see my response and have you checked the server logs > > >>>> especially > > >>>> > > > the GC logs? It still sounds like you are running out of > memory > > >>>> on the > > >>>> > > > broker. What is your max heap memory and are you thrashing > once > > >>>> you > > >>>> > start > > >>>> > > > writing to all those partitions? > > >>>> > > > > > >>>> > > > You have measured very thoroughly from an external point of > > view, > > >>>> i > > >>>> > think > > >>>> > > > now you'll have to start measuring the internal metrics. Maybe > > >>>> someone > > >>>> > > else > > >>>> > > > will have ideas on what jmx values to watch. > > >>>> > > > > > >>>> > > > Best, > > >>>> > > > Thunder > > >>>> > > > > > >>>> > > > > > >>>> > > > -----Original Message----- > > >>>> > > > From: Rajiv Kurian [ra...@signalfuse.com] > > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM > > >>>> > > > To: users@kafka.apache.org [users@kafka.apache.org] > > >>>> > > > Subject: Re: Trying to figure out kafka latency issues > > >>>> > > > > > >>>> > > > Some more work tells me that the end to end latency numbers > vary > > >>>> with > > >>>> > the > > >>>> > > > number of partitions I am writing to. I did an experiment, > where > > >>>> based > > >>>> > > on a > > >>>> > > > run time flag I would dynamically select how many of the *1024 > > >>>> > > partitions* > > >>>> > > > I write to. So say I decide I'll write to at most 256 > partitions > > >>>> I mod > > >>>> > > > whatever partition I would actually write to by 256. Basically > > the > > >>>> > number > > >>>> > > > of partitions for this topic on the broker remains the same at > > >>>> *1024* > > >>>> > > > partitions but the number of partitions my producers write to > > >>>> changes > > >>>> > > > dynamically based on a run time flag. So something like this: > > >>>> > > > > > >>>> > > > int partition = getPartitionForMessage(message); > > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // > This > > >>>> flag > > >>>> > can > > >>>> > > be > > >>>> > > > updated without bringing the application down - just a > volatile > > >>>> read of > > >>>> > > > some number set externally. > > >>>> > > > int moddedPartition = partition % maxPartitionsToWrite. > > >>>> > > > // Send a message to this Kafka partition. > > >>>> > > > > > >>>> > > > Here are some interesting things I've noticed: > > >>>> > > > > > >>>> > > > i) When I start my client and it *never writes* to more than > *8 > > >>>> > > > partitions *(same > > >>>> > > > data rate but fewer partitions) - the end to end *99th latency > > is > > >>>> > 300-350 > > >>>> > > > ms*. Quite a bit of this (numbers in my previous emails) is > the > > >>>> latency > > >>>> > > > from producer -> broker and the latency from broker -> > consumer. > > >>>> Still > > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing. > > >>>> > > > > > >>>> > > > ii) When I increase the maximum number of partitions, end to > end > > >>>> > latency > > >>>> > > > increases dramatically. At *256 partitions* the end to end > *99th > > >>>> > latency > > >>>> > > is > > >>>> > > > still 390 - 418 ms.* Worse than the latency figures for *8 > > >>>> *partitions, > > >>>> > > but > > >>>> > > > not by much. When I increase this number to *512 partitions > *the > > >>>> end > > >>>> > > > to end *99th > > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024* > > >>>> partitions > > >>>> > the > > >>>> > > > *99th > > >>>> > > > latency is at 25 - 30 seconds*. > > >>>> > > > A table of the numbers: > > >>>> > > > > > >>>> > > > Max number of partitions written to (out of 1024) > > >>>> > > > > > >>>> > > > End to end latency > > >>>> > > > > > >>>> > > > 8 > > >>>> > > > > > >>>> > > > 300 - 350 ms > > >>>> > > > > > >>>> > > > 256 > > >>>> > > > > > >>>> > > > 390 - 418 ms > > >>>> > > > > > >>>> > > > 512 > > >>>> > > > > > >>>> > > > 19 - 24 seconds > > >>>> > > > > > >>>> > > > 1024 > > >>>> > > > > > >>>> > > > 25 - 30 seconds > > >>>> > > > > > >>>> > > > > > >>>> > > > iii) Once I make the max number of partitions high enough, > > >>>> reducing it > > >>>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, > > the > > >>>> > > latency > > >>>> > > > goes up. But while the producer is running if I go down from > > >>>> *512* to > > >>>> > > > *8 *partitions, > > >>>> > > > it doesn't reduce the latency numbers. My guess is that the > > >>>> producer is > > >>>> > > > creating some state lazily per partition and this state is > > >>>> causing the > > >>>> > > > latency. Once this state is created, writing to fewer > partitions > > >>>> > doesn't > > >>>> > > > seem to help. Only a restart of the producer calms things > down. > > >>>> > > > > > >>>> > > > So my current plan is to reduce the number of partitions on > the > > >>>> topic, > > >>>> > > but > > >>>> > > > there seems to be something deeper going on for the latency > > >>>> numbers to > > >>>> > be > > >>>> > > > so poor to begin with and then suffer so much more (non > > linearly) > > >>>> with > > >>>> > > > additional partitions. > > >>>> > > > > > >>>> > > > Thanks! > > >>>> > > > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian < > > >>>> ra...@signalfuse.com> > > >>>> > > > wrote: > > >>>> > > > > > > >>>> > > > > I've done some more measurements. I've also started > measuring > > >>>> the > > >>>> > > latency > > >>>> > > > > between when I ask my producer to send a message and when I > > get > > >>>> an > > >>>> > > > > acknowledgement via the callback. Here is my code: > > >>>> > > > > > > >>>> > > > > // This function is called on every producer once every 30 > > >>>> seconds. > > >>>> > > > > > > >>>> > > > > public void addLagMarkers(final Histogram enqueueLag) { > > >>>> > > > > > > >>>> > > > > final int numberOfPartitions = 1024; > > >>>> > > > > > > >>>> > > > > final long timeOfEnqueue = > System.currentTimeMillis(); > > >>>> > > > > > > >>>> > > > > final Callback callback = new Callback() { > > >>>> > > > > > > >>>> > > > > @Override > > >>>> > > > > > > >>>> > > > > public void onCompletion(RecordMetadata > metadata, > > >>>> > Exception > > >>>> > > > ex) > > >>>> > > > > { > > >>>> > > > > > > >>>> > > > > if (metadata != null) { > > >>>> > > > > > > >>>> > > > > // The difference between ack time from > > >>>> broker > > >>>> > and > > >>>> > > > > enqueue time. > > >>>> > > > > > > >>>> > > > > final long timeOfAck = > > >>>> > System.currentTimeMillis(); > > >>>> > > > > > > >>>> > > > > final long lag = timeOfAck - > > timeOfEnqueue; > > >>>> > > > > > > >>>> > > > > enqueueLag.update(lag); > > >>>> > > > > > > >>>> > > > > } > > >>>> > > > > > > >>>> > > > > } > > >>>> > > > > > > >>>> > > > > }; > > >>>> > > > > > > >>>> > > > > for (int i = 0; i < numberOfPartitions; i++) { > > >>>> > > > > > > >>>> > > > > try { > > >>>> > > > > > > >>>> > > > > byte[] value = > > >>>> LagMarker.serialize(timeOfEnqueue); > > >>>> > // > > >>>> > > 10 > > >>>> > > > > bytes -> short version + long timestamp. > > >>>> > > > > > > >>>> > > > > // This message is later used by the > consumers > > >>>> to > > >>>> > > measure > > >>>> > > > > lag. > > >>>> > > > > > > >>>> > > > > ProducerRecord record = new > > >>>> ProducerRecord(MY_TOPIC, > > >>>> > i, > > >>>> > > > > null, value); > > >>>> > > > > > > >>>> > > > > kafkaProducer.send(record, callback); > > >>>> > > > > > > >>>> > > > > } catch (Exception e) { > > >>>> > > > > > > >>>> > > > > // We just dropped a lag marker. > > >>>> > > > > > > >>>> > > > > } > > >>>> > > > > > > >>>> > > > > } > > >>>> > > > > > > >>>> > > > > } > > >>>> > > > > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not > > >>>> stellar, but > > >>>> > > > > doesn't account for the *20-30 second 99th* I see on the end > > to > > >>>> end > > >>>> > > lag. > > >>>> > > > > I am consuming in a tight loop on the Consumers (using the > > >>>> > > > SimpleConsumer) > > >>>> > > > > with minimal processing with a *99th fetch time *of > *130-140* > > >>>> ms, so > > >>>> > I > > >>>> > > > > don't think that should be a problem either. Completely > > baffled. > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > Thanks! > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian < > > >>>> ra...@signalfuse.com> > > >>>> > > > > wrote: > > >>>> > > > >> > > >>>> > > > >> > > >>>> > > > >> > > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian < > > >>>> ra...@signalfuse.com > > >>>> > > > > >>>> > > > >> wrote: > > >>>> > > > >>> > > >>>> > > > >>> I am trying to replace a Thrift peer to peer API with > kafka > > >>>> for a > > >>>> > > > >>> particular work flow. I am finding the 99th percentile > > >>>> latency to > > >>>> > be > > >>>> > > > >>> unacceptable at this time. This entire work load runs in > an > > >>>> Amazon > > >>>> > > > VPC. I'd > > >>>> > > > >>> greatly appreciate it if some one has any insights on why > I > > am > > >>>> > seeing > > >>>> > > > such > > >>>> > > > >>> poor numbers. Here are some details and measurements > taken: > > >>>> > > > >>> > > >>>> > > > >>> i) I have a single topic with 1024 partitions that I am > > >>>> writing to > > >>>> > > from > > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer. > > >>>> > > > >>> > > >>>> > > > >>> ii) I have 3 brokers, each on a c3 2x machine on ec2. > Each > > of > > >>>> > those > > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB > > SSDs. > > >>>> The > > >>>> > > > broker -> > > >>>> > > > >>> partitions mapping was decided by Kafka when I created the > > >>>> topic. > > >>>> > > > >>> > > >>>> > > > >>> iii) I write about 22 thousand messages per second from > > >>>> across the > > >>>> > 6 > > >>>> > > > >>> clients. This number was calculated using distributed > > >>>> counters. I > > >>>> > > just > > >>>> > > > >>> increment a distributed counter in the callback from my > > >>>> enqueue job > > >>>> > > if > > >>>> > > > the > > >>>> > > > >>> metadata returned is not null. I also increment a number o > > >>>> dropped > > >>>> > > > messages > > >>>> > > > >>> counter if the callback has a non-null exception or if > there > > >>>> was an > > >>>> > > > >>> exception in the synchronous send call. The number of > > dropped > > >>>> > > messages > > >>>> > > > is > > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are > > >>>> responsible for > > >>>> > > > 95% of > > >>>> > > > >>> the traffic. Messages are very tiny and have null keys > and > > >>>> 27 byte > > >>>> > > > values > > >>>> > > > >>> (2 byte version and 25 byte payload). Again these messages > > are > > >>>> > > written > > >>>> > > > >>> using the kafka 0.8.2 client. > > >>>> > > > >>> > > >>>> > > > >>> iv) I have 3 consumer processes consuming only from this > > >>>> topic. > > >>>> > Each > > >>>> > > > >>> consumer process is assigned a disjoint set of the 1024 > > >>>> partitions > > >>>> > by > > >>>> > > > an > > >>>> > > > >>> eternal arbiter. Each consumer process then creates a > > mapping > > >>>> from > > >>>> > > > brokers > > >>>> > > > >>> -> partitions it has been assigned. It then starts one > > fetcher > > >>>> > thread > > >>>> > > > per > > >>>> > > > >>> broker. Each thread queries the broker (using the > > >>>> SimpleConsumer) > > >>>> > it > > >>>> > > > has > > >>>> > > > >>> been assigned for partitions such that partitions = > > >>>> (partitions on > > >>>> > > > broker) > > >>>> > > > >>> *∩ *(partitions assigned to the process by the arbiter). > So > > in > > >>>> > effect > > >>>> > > > >>> there are 9 consumer threads across 3 consumer processes > > that > > >>>> > query a > > >>>> > > > >>> disjoint set of partitions for the single topic and > amongst > > >>>> > > themselves > > >>>> > > > they > > >>>> > > > >>> manage to consume from every topic. So each thread has a > run > > >>>> loop > > >>>> > > > where it > > >>>> > > > >>> asks for messages, consumes them then asks for more > > messages. > > >>>> When > > >>>> > a > > >>>> > > > run > > >>>> > > > >>> loop starts, it starts by querying for the latest message > > in a > > >>>> > > > partition > > >>>> > > > >>> (i.e. discards any previous backup) and then maintains a > map > > >>>> of > > >>>> > > > partition > > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it > > consumes > > >>>> > > > messages in > > >>>> > > > >>> order. > > >>>> > > > >>> > > >>>> > > > >> Edit: Mean't external arbiter. > > >>>> > > > >> > > >>>> > > > >>> > > >>>> > > > >>> v) Consumption is really simple. Each message is put on a > > non > > >>>> > > blocking > > >>>> > > > >>> efficient ring buffer. If the ring buffer is full the > > message > > >>>> is > > >>>> > > > dropped. I > > >>>> > > > >>> measure the mean time between fetches and it is the same > as > > >>>> the > > >>>> > time > > >>>> > > > for > > >>>> > > > >>> fetches up to a ms, meaning no matter how many messages > are > > >>>> > dequeued, > > >>>> > > > it > > >>>> > > > >>> takes almost no time to process them. At the end of > > >>>> processing a > > >>>> > > fetch > > >>>> > > > >>> request I increment another distributed counter that > counts > > >>>> the > > >>>> > > number > > >>>> > > > of > > >>>> > > > >>> messages processed. This counter tells me that on average > I > > am > > >>>> > > > consuming > > >>>> > > > >>> the same number of messages/sec that I enqueue on the > > >>>> producers > > >>>> > i.e. > > >>>> > > > around > > >>>> > > > >>> 22 thousand messages/sec. > > >>>> > > > >>> > > >>>> > > > >>> vi) The 99th percentile of the number of messages fetched > > per > > >>>> fetch > > >>>> > > > >>> request is about 500 messages. The 99th percentile of the > > >>>> time it > > >>>> > > > takes to > > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with > the > > >>>> buffer > > >>>> > > and > > >>>> > > > >>> maxWait settings on the SimpleConsumer and attempting to > > >>>> consume > > >>>> > more > > >>>> > > > >>> messages was leading the 99th percentile of the fetch time > > to > > >>>> > balloon > > >>>> > > > up, > > >>>> > > > >>> so I am consuming in smaller batches right now. > > >>>> > > > >>> > > >>>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a > > >>>> trace > > >>>> > > message > > >>>> > > > >>> into each partition (so 1024 messages per producer every > 30 > > >>>> > seconds). > > >>>> > > > Each > > >>>> > > > >>> message only contains the System.currentTimeMillis() at > the > > >>>> time of > > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step > > (v) > > >>>> > always > > >>>> > > > >>> checks to see if a message it dequeued is of this trace > type > > >>>> > (instead > > >>>> > > > of > > >>>> > > > >>> the regular message type). This is a simple check on the > > >>>> first 2 > > >>>> > byte > > >>>> > > > >>> version that each message buffer contains. If it is a > trace > > >>>> message > > >>>> > > it > > >>>> > > > >>> reads it's payload as a long and uses the difference > between > > >>>> the > > >>>> > > system > > >>>> > > > >>> time on the consumer and this payload and updates a > > histogram > > >>>> with > > >>>> > > this > > >>>> > > > >>> difference value. Ignoring NTP skew (which I've measured > to > > >>>> be in > > >>>> > the > > >>>> > > > order > > >>>> > > > >>> of milliseconds) this is the lag between when a message > was > > >>>> > enqueued > > >>>> > > > on the > > >>>> > > > >>> producer and when it was read on the consumer. > > >>>> > > > >>> > > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 > > >>>> byte long > > >>>> > > for > > >>>> > > > >> timestamp. > > >>>> > > > >> > > >>>> > > > >> So pseudocode for every consumer thread (one per broker per > > >>>> consumer > > >>>> > > > >>> process (9 in total across 3 consumers) is: > > >>>> > > > >>> > > >>>> > > > >>> void run() { > > >>>> > > > >>> > > >>>> > > > >>> while (running) { > > >>>> > > > >>> > > >>>> > > > >>> FetchRequest fetchRequest = buildFetchRequest( > > >>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker); // > > This > > >>>> > > > >>> assignment is done by an external arbiter. > > >>>> > > > >>> > > >>>> > > > >>> measureTimeBetweenFetchRequests(); // The 99th of > this > > >>>> is > > >>>> > > > 130-140ms > > >>>> > > > >>> > > >>>> > > > >>> FetchResponse fetchResponse = > > >>>> fetchResponse(fetchRequest); // > > >>>> > > The > > >>>> > > > >>> 99th of this is 130-140ms, which is the same as the time > > >>>> between > > >>>> > > > fetches. > > >>>> > > > >>> > > >>>> > > > >>> processData(fetchResponse); // The 99th on this is > 2-3 > > >>>> ms. > > >>>> > > > >>> } > > >>>> > > > >>> } > > >>>> > > > >>> > > >>>> > > > >>> void processData(FetchResponse response) { > > >>>> > > > >>> try (Timer.Context _ = processWorkerDataTimer.time() { > // > > >>>> The > > >>>> > 99th > > >>>> > > > on > > >>>> > > > >>> this is 2-3 ms. > > >>>> > > > >>> for (Message message : response) { > > >>>> > > > >>> if (typeOf(message) == NORMAL_DATA) { > > >>>> > > > >>> enqueueOnNonBlockingRingBuffer(message); // > Never > > >>>> blocks, > > >>>> > > > >>> drops message if ring buffer is full. > > >>>> > > > >>> } else if (typeOf(message) == TRACE_DATA) { > > >>>> > > > >>> long timeOfEnqueue = geEnqueueTime(message); > > >>>> > > > >>> long currentTime = System.currentTimeMillis(); > > >>>> > > > >>> long difference = currentTime - timeOfEnqueue; > > >>>> > > > >>> lagHistogram.update(difference); > > >>>> > > > >>> } > > >>>> > > > >>> } > > >>>> > > > >>> } > > >>>> > > > >>> } > > >>>> > > > >>> > > >>>> > > > >>> viii) So the kicker is that this lag is really really > high: > > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I > > >>>> could > > >>>> > even > > >>>> > > > >>> discount some of it through NTP skew. > > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger > > than > > >>>> > median > > >>>> > > > >>> kind of telling us how the distribution has a big tail. > > >>>> > > > >>> 99th Lag: *20 seconds*!! > > >>>> > > > >>> > > >>>> > > > >>> I can't quite figure out the source of the lag. Here are > > some > > >>>> other > > >>>> > > > >>> things of note: > > >>>> > > > >>> > > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I > see > > >>>> > > occasional > > >>>> > > > >>> spikes to more than 80% - 95%. This could probably be > > causing > > >>>> > issues. > > >>>> > > > I am > > >>>> > > > >>> wondering if this is down to the high number of > partitions I > > >>>> have > > >>>> > and > > >>>> > > > how > > >>>> > > > >>> each partition ends up receiving not too many messages. > 22k > > >>>> > messages > > >>>> > > > spread > > >>>> > > > >>> across 1024 partitions = only 21 odd > messages/sec/partition. > > >>>> But > > >>>> > each > > >>>> > > > >>> broker should be receiving about 7500 messages/sec. It > > could > > >>>> also > > >>>> > be > > >>>> > > > down > > >>>> > > > >>> to the nature of these messages being tiny. I imagine that > > the > > >>>> > kafka > > >>>> > > > wire > > >>>> > > > >>> protocol is probably close to the 25 byte message payload. > > It > > >>>> also > > >>>> > > has > > >>>> > > > to > > >>>> > > > >>> do a few CRC32 calculations etc. But again given that > these > > >>>> are > > >>>> > > C3.2x > > >>>> > > > >>> machines and the number of messages is so tiny I'd expect > it > > >>>> to do > > >>>> > > > better. > > >>>> > > > >>> But again reading this article > > >>>> > > > >>> < > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > >>>> > > > > > >>>> > > > about > > >>>> > > > >>> 2 million messages/sec across three brokers makes it seem > > >>>> like 22k > > >>>> > 25 > > >>>> > > > byte > > >>>> > > > >>> messages should be very easy. So my biggest suspicion > right > > >>>> now is > > >>>> > > > that the > > >>>> > > > >>> large number of partitions is some how responsible for > this > > >>>> > latency. > > >>>> > > > >>> > > >>>> > > > >>> 2) On the consumer given that the 99th time between > fetches > > >>>> == 99th > > >>>> > > > time > > >>>> > > > >>> of a fetch, I don't see how I could do any better. It's > > pretty > > >>>> > close > > >>>> > > to > > >>>> > > > >>> just getting batches of data, iterating through them and > > >>>> dropping > > >>>> > > them > > >>>> > > > on > > >>>> > > > >>> the floor. My requests on the consumer are built like > this: > > >>>> > > > >>> > > >>>> > > > >>> FetchRequestBuilder builder = new > > >>>> > > > >>> FetchRequestBuilder().clientId(clientName) > > >>>> > > > >>> > > >>>> > > > >>> .maxWait(100).minBytes(1000); // max wait > > of > > >>>> 100 > > >>>> > ms > > >>>> > > or > > >>>> > > > >>> at least 1000 bytes whichever happens first. > > >>>> > > > >>> > > >>>> > > > >>> // Add a fetch request for every partition. > > >>>> > > > >>> > > >>>> > > > >>> for (PartitionMetadata partition : partitions) { > > >>>> > > > >>> > > >>>> > > > >>> int partitionId = partition.partitionId(); > > >>>> > > > >>> > > >>>> > > > >>> long offset = readOffsets.get(partition); // > > >>>> This map > > >>>> > > > >>> maintains the next partition to be read. > > >>>> > > > >>> > > >>>> > > > >>> builder.addFetch(MY_TOPIC, partitionId, > offset, > > >>>> 3000); > > >>>> > > // > > >>>> > > > >>> Up to 3000 bytes per fetch request.} > > >>>> > > > >>> } > > >>>> > > > >>> 3) On the producer side I use the kafka beta producer. > I've > > >>>> played > > >>>> > > > >>> around with many settings but none of them seem to have > > made a > > >>>> > > > difference. > > >>>> > > > >>> Here are my current settings: > > >>>> > > > >>> > > >>>> > > > >>> ProducerConfig values: > > >>>> > > > >>> > > >>>> > > > >>> block.on.buffer.full = false > > >>>> > > > >>> > > >>>> > > > >>> retry.backoff.ms = 100 > > >>>> > > > >>> > > >>>> > > > >>> buffer.memory = 83886080 // Kind of big - could > it > > be > > >>>> > adding > > >>>> > > > >>> lag? > > >>>> > > > >>> > > >>>> > > > >>> batch.size = 40500 // This is also kind of big - > > >>>> could it > > >>>> > be > > >>>> > > > >>> adding lag? > > >>>> > > > >>> > > >>>> > > > >>> metrics.sample.window.ms = 30000 > > >>>> > > > >>> > > >>>> > > > >>> metadata.max.age.ms = 300000 > > >>>> > > > >>> > > >>>> > > > >>> receive.buffer.bytes = 32768 > > >>>> > > > >>> > > >>>> > > > >>> timeout.ms = 30000 > > >>>> > > > >>> > > >>>> > > > >>> max.in.flight.requests.per.connection = 5 > > >>>> > > > >>> > > >>>> > > > >>> metric.reporters = [] > > >>>> > > > >>> > > >>>> > > > >>> bootstrap.servers = [broker1, broker2, broker3] > > >>>> > > > >>> > > >>>> > > > >>> client.id = > > >>>> > > > >>> > > >>>> > > > >>> compression.type = none > > >>>> > > > >>> > > >>>> > > > >>> retries = 0 > > >>>> > > > >>> > > >>>> > > > >>> max.request.size = 1048576 > > >>>> > > > >>> > > >>>> > > > >>> send.buffer.bytes = 131072 > > >>>> > > > >>> > > >>>> > > > >>> acks = 1 > > >>>> > > > >>> > > >>>> > > > >>> reconnect.backoff.ms = 10 > > >>>> > > > >>> > > >>>> > > > >>> linger.ms = 250 // Based on this config I'd > > imagine > > >>>> that > > >>>> > > the > > >>>> > > > >>> lag should be bounded to about 250 ms over all. > > >>>> > > > >>> > > >>>> > > > >>> metrics.num.samples = 2 > > >>>> > > > >>> > > >>>> > > > >>> metadata.fetch.timeout.ms = 60000 > > >>>> > > > >>> > > >>>> > > > >>> I know this is a lot of information. I just wanted to > > provide > > >>>> as > > >>>> > much > > >>>> > > > >>> context as possible. > > >>>> > > > >>> > > >>>> > > > >>> Thanks in advance! > > >>>> > > > >>> > > >>>> > > > >>> > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>> > > >>> > > >> > > > > > >