My broker logs say acks = 1 unless this is some other ack: Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 858424; ClientId: ; *RequiredAcks: 1*; AckTimeoutMs: 30000 ms; TopicAndPartition:
I'll try to get a stand alone test going, and try to narrow down the problem. Thanks for helping! On Tue, Dec 30, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Yes, the fetch request blocks until there is data for you, so you should > see high remote time there. > > For the producer you should only see remote time if you have acks=-1. This > represents the time it takes the message to replicate to other in-sync > replicas. > > I agree if there are no slow requests on the broker during an interval > where you saw a long pause, this points to something happening on the > client side. It might be worth enabling debug or trace logging on the > client to see what is happening when the pauses occur. > > Alternately if you can find a reproducible test case we can turn into a > JIRA someone else may be willing to dive in. > > -Jay > > On Tue, Dec 30, 2014 at 4:37 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > Got it. I had to enable to see the logs. Here are two files with the > > timings I got from the period I had logging enabled: > > Producer Requests : > > > > > https://docs.google.com/spreadsheets/d/1spVFWsf7T8ZmwM0JMYnkBoyYufRhHav50sueFA9ztP8/edit?usp=sharing > > > > Fetch Requests: > > > > > https://docs.google.com/spreadsheets/d/1PMGslvlttNQOd1uFQDkgPOY31nSK9TZYFy7oa75QaWA/edit?usp=sharing > > > > Things I noted: > > 1) The producer request totalTime barely goes above 100 ms and when it > > does, it is always due to a high remoteTime. Again given this is barely > > ever above 100ms this doesn't look that bad. Maybe I need to keep it on > for > > longer to get more samples. The overall time on the producer does seem a > > bit on the high side though even though it definitely doesn't account for > > the 20-30 seconds end to end time I saw. > > > > 2) The fetch request totalTime does frequently go to a 100 ms odd. It is > > also always due to a high remoteTime. The remoteTime is in the ballpark > of > > 100ms always. I specify a maxWait of 100ms on the client when making my > > fetch requests, so it might be this 100ms that I am seeing. My guess is > > that the buffer sizes I specified don't get filled all the time, so the > > maxWait timeout is hit and we end up waiting a 100 ms on the broker. > Again > > I don't see how the end to end time could be so poor. > > > > From these timings (maybe I don't have enough samples) it seems like the > > problem is not in the brokers. Does that seem like a legitimate > conclusion? > > I might have to measure for a longer period, to get the tail. I am > planning > > on extracting the producer and consumer code to get a self-contained load > > test going. I'll do the same end to end lag measurements and see if it's > my > > environment that is adding this lag somehow. > > > > Thanks! > > > > > > On Tue, Dec 30, 2014 at 11:58 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > The produce to the local log is the time taken to write to the local > > > filesystem. If you see that spike up then that is the culprit. > > > > > > What I was referring to is the line in kafka-request.log that looks > > > something like: > > > "Completed request:%s from client > > > > > > > > > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" > > > This breaks down the total request time and how much was spent on local > > I/O > > > etc. If totalTime is always small the server is not to blame. If it > > spikes, > > > then the question is where is that time going and this will answer > that. > > > > > > You actually raise a good point about that other log message. It says > "4 > > > bytes written to log xyz" but what it is actually logging is the number > > of > > > messages not the number of bytes, so that is quite misleading and a > bug. > > > > > > -Jay > > > > > > On Mon, Dec 29, 2014 at 6:37 PM, Rajiv Kurian <ra...@signalfuse.com> > > > wrote: > > > > > > > Thanks Jay. > > > > > > > > I just used JMX to change the log level on the broker and checked the > > > logs. > > > > I am still not sure what line is exactly telling me how much time a > > > > producer took to process a request. I see log lines of this format: > > > > > > > > 2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] Produce to > local > > > log > > > > in 5 ms > > > > > > > > Is this what I should be looking out for? I see producer requests of > > the > > > > form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 > > > ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] Handling > request: > > > > Name: ProducerRequest; Version: 0; CorrelationId: 36308; more stuff" > > > but I > > > > can't quite tell when this request was done with. So right now I see > > that > > > > requests are logged frequently on the brokers (multiple times per > > second) > > > > but since I don't know when they are finished with I can't tell the > > total > > > > time. > > > > > > > > Some other things that I found kind of odd are: > > > > > > > > 1) The producer requests seem to have an ack timeout of 30000 ms. I > > don't > > > > think I set this on the producer. I don't know if this could have > > > anything > > > > to do with the latency problem. > > > > > > > > 2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] Handling > request: > > > > Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ; > > > > RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff. > > > > > > > > 2) I see a bunch of small writes written to my [topic, partition] > logs. > > > My > > > > messages are at a minimum 27 bytes, so maybe this is something else. > > > Again > > > > don't know if this is a problem: > > > > > > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written > > to > > > > log MY.TOPIC-400 beginning at offset 19221923 and ending at offset > > > 19221923 > > > > > > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] 4 bytes written > > to > > > > log MY.TOPIC-208 beginning at offset 29438019 and ending at offset > > > 29438022 > > > > > > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written > > to > > > > log MY.TOPIC-163 beginning at offset 14821977 and ending at offset > > > 14821977 > > > > > > > > 2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written > > to > > > > log MY.TOPIC118 beginning at offset 19321510 and ending at offset > > > 19321510 > > > > > > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] > > > > [kafka.server.KafkaApis ]: [KafkaApi-11] 3 bytes written > > to > > > > log MY.TOPIC-463 beginning at offset 28777627 and ending at offset > > > 28777629 > > > > > > > > On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > Hey Rajiv, > > > > > > > > > > Yes, if you uncomment the line > > > > > #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender > > > > > in the example log4j.properties file that will enable logging of > each > > > > > request including the time taken processing the request. This is > the > > > > first > > > > > step for diagnosing latency spikes since this will rule out the > > server. > > > > You > > > > > may also want to enable DEBUG or TRACE on the producer and see what > > is > > > > > happening when those spikes occur. > > > > > > > > > > The JMX stats show that at least the producer is not exhausting the > > I/O > > > > > thread (it is 92% idle) and doesn't seem to be waiting on memory > > which > > > is > > > > > somewhat surprising to me (the NaN is an artifact of how we compute > > > that > > > > > stat--no allocations took place in the time period measured so it > is > > > kind > > > > > of 0/0). > > > > > > > > > > -Jay > > > > > > > > > > On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian < > ra...@signalfuse.com> > > > > > wrote: > > > > > > > > > > > In case the attachments don't work out here is an imgur link - > > > > > > http://imgur.com/NslGpT3,Uw6HFow#0 > > > > > > > > > > > > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian < > > ra...@signalfuse.com> > > > > > > wrote: > > > > > > > > > > > > > Never mind about (2). I see these stats are already being > output > > by > > > > the > > > > > > > kafka producer. I've attached a couple of screenshots (couldn't > > > copy > > > > > > paste > > > > > > > from jvisualvm ). Do any of these things strike as odd? The > > > > > > > bufferpool-wait-ratio sadly shows up as a NaN. > > > > > > > > > > > > > > I still don't know how to figure out (1). > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian < > > > ra...@signalfuse.com> > > > > > > > wrote: > > > > > > > > > > > > > >> Hi Jay, > > > > > > >> > > > > > > >> Re (1) - I am not sure how to do this? Actually I am not sure > > what > > > > > this > > > > > > >> means. Is this the time every write/fetch request is received > on > > > the > > > > > > >> broker? Do I need to enable some specific log level for this > to > > > show > > > > > > up? It > > > > > > >> doesn't show up in the usual log. Is this information also > > > available > > > > > via > > > > > > >> jmx somehow? > > > > > > >> Re (2) - Are you saying that I should instrument the > "percentage > > > of > > > > > time > > > > > > >> waiting for buffer space" stat myself? If so how do I do this. > > Or > > > > are > > > > > > these > > > > > > >> stats already output to jmx by the kafka producer code. This > > seems > > > > > like > > > > > > >> it's in the internals of the kafka producer client code. > > > > > > >> > > > > > > >> Thanks again! > > > > > > >> > > > > > > >> > > > > > > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian < > > > > ra...@signalfuse.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >>> Thanks Jay. Will check (1) and (2) and get back to you. The > > test > > > is > > > > > not > > > > > > >>> stand-alone now. It might be a bit of work to extract it to a > > > > > > stand-alone > > > > > > >>> executable. It might take me a bit of time to get that going. > > > > > > >>> > > > > > > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io > > > > > > wrote: > > > > > > >>> > > > > > > >>>> Hey Rajiv, > > > > > > >>>> > > > > > > >>>> This sounds like a bug. The more info you can help us get > the > > > > easier > > > > > > to > > > > > > >>>> fix. Things that would help: > > > > > > >>>> 1. Can you check if the the request log on the servers shows > > > > latency > > > > > > >>>> spikes > > > > > > >>>> (in which case it is a server problem)? > > > > > > >>>> 2. It would be worth also getting the jmx stats on the > > producer > > > as > > > > > > they > > > > > > >>>> will show things like what percentage of time it is waiting > > for > > > > > buffer > > > > > > >>>> space etc. > > > > > > >>>> > > > > > > >>>> If your test is reasonably stand-alone it would be great to > > > file a > > > > > > JIRA > > > > > > >>>> and > > > > > > >>>> attach the test code and the findings you already have so > > > someone > > > > > can > > > > > > >>>> dig > > > > > > >>>> into what is going on. > > > > > > >>>> > > > > > > >>>> -Jay > > > > > > >>>> > > > > > > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian < > > > > ra...@signalfuse.com > > > > > > > > > > > > >>>> wrote: > > > > > > >>>> > > > > > > >>>> > Hi all, > > > > > > >>>> > > > > > > > >>>> > Bumping this up, in case some one has any ideas. I did yet > > > > another > > > > > > >>>> > experiment where I create 4 producers and stripe the send > > > > requests > > > > > > >>>> across > > > > > > >>>> > them in a manner such that any one producer only sees 256 > > > > > partitions > > > > > > >>>> > instead of the entire 1024. This seems to have helped a > bit, > > > and > > > > > > >>>> though I > > > > > > >>>> > still see crazy high 99th (25-30 seconds), the median, > mean, > > > > 75th > > > > > > and > > > > > > >>>> 95th > > > > > > >>>> > percentile have all gone down. > > > > > > >>>> > > > > > > > >>>> > Thanks! > > > > > > >>>> > > > > > > > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges < > > > > > > >>>> tstump...@ntent.com> > > > > > > >>>> > wrote: > > > > > > >>>> > > > > > > > >>>> > > Ah I thought it was restarting the broker that made > things > > > > > better > > > > > > :) > > > > > > >>>> > > > > > > > > >>>> > > Yeah I have no experience with the Java client so can't > > > really > > > > > > help > > > > > > >>>> > there. > > > > > > >>>> > > > > > > > > >>>> > > Good luck! > > > > > > >>>> > > > > > > > > >>>> > > -----Original Message----- > > > > > > >>>> > > From: Rajiv Kurian [ra...@signalfuse.com] > > > > > > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM > > > > > > >>>> > > To: users@kafka.apache.org [users@kafka.apache.org] > > > > > > >>>> > > Subject: Re: Trying to figure out kafka latency issues > > > > > > >>>> > > > > > > > > >>>> > > I'll take a look at the GC profile of the brokers Right > > now > > > I > > > > > keep > > > > > > >>>> a tab > > > > > > >>>> > on > > > > > > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory > (on > > > the > > > > > > >>>> machine > > > > > > >>>> > not > > > > > > >>>> > > JVM heap) free disk space on the broker. I'll need to > > take a > > > > > look > > > > > > >>>> at the > > > > > > >>>> > > JVM metrics too. What seemed strange is that going from > 8 > > -> > > > > 512 > > > > > > >>>> > partitions > > > > > > >>>> > > increases the latency, but going fro 512-> 8 does not > > > decrease > > > > > it. > > > > > > >>>> I have > > > > > > >>>> > > to restart the producer (but not the broker) for the end > > to > > > > end > > > > > > >>>> latency > > > > > > >>>> > to > > > > > > >>>> > > go down That made it seem that the fault was probably > > with > > > > the > > > > > > >>>> producer > > > > > > >>>> > > and not the broker. Only restarting the producer made > > things > > > > > > >>>> better. I'll > > > > > > >>>> > > do more extensive measurement on the broker. > > > > > > >>>> > > > > > > > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges < > > > > > > >>>> tstump...@ntent.com> > > > > > > >>>> > > wrote: > > > > > > >>>> > > > > > > > > > >>>> > > > Did you see my response and have you checked the > server > > > logs > > > > > > >>>> especially > > > > > > >>>> > > > the GC logs? It still sounds like you are running out > of > > > > > memory > > > > > > >>>> on the > > > > > > >>>> > > > broker. What is your max heap memory and are you > > thrashing > > > > > once > > > > > > >>>> you > > > > > > >>>> > start > > > > > > >>>> > > > writing to all those partitions? > > > > > > >>>> > > > > > > > > > >>>> > > > You have measured very thoroughly from an external > point > > > of > > > > > > view, > > > > > > >>>> i > > > > > > >>>> > think > > > > > > >>>> > > > now you'll have to start measuring the internal > metrics. > > > > Maybe > > > > > > >>>> someone > > > > > > >>>> > > else > > > > > > >>>> > > > will have ideas on what jmx values to watch. > > > > > > >>>> > > > > > > > > > >>>> > > > Best, > > > > > > >>>> > > > Thunder > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> > > > -----Original Message----- > > > > > > >>>> > > > From: Rajiv Kurian [ra...@signalfuse.com] > > > > > > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM > > > > > > >>>> > > > To: users@kafka.apache.org [users@kafka.apache.org] > > > > > > >>>> > > > Subject: Re: Trying to figure out kafka latency issues > > > > > > >>>> > > > > > > > > > >>>> > > > Some more work tells me that the end to end latency > > > numbers > > > > > vary > > > > > > >>>> with > > > > > > >>>> > the > > > > > > >>>> > > > number of partitions I am writing to. I did an > > experiment, > > > > > where > > > > > > >>>> based > > > > > > >>>> > > on a > > > > > > >>>> > > > run time flag I would dynamically select how many of > the > > > > *1024 > > > > > > >>>> > > partitions* > > > > > > >>>> > > > I write to. So say I decide I'll write to at most 256 > > > > > partitions > > > > > > >>>> I mod > > > > > > >>>> > > > whatever partition I would actually write to by 256. > > > > Basically > > > > > > the > > > > > > >>>> > number > > > > > > >>>> > > > of partitions for this topic on the broker remains the > > > same > > > > at > > > > > > >>>> *1024* > > > > > > >>>> > > > partitions but the number of partitions my producers > > write > > > > to > > > > > > >>>> changes > > > > > > >>>> > > > dynamically based on a run time flag. So something > like > > > > this: > > > > > > >>>> > > > > > > > > > >>>> > > > int partition = getPartitionForMessage(message); > > > > > > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); > > // > > > > > This > > > > > > >>>> flag > > > > > > >>>> > can > > > > > > >>>> > > be > > > > > > >>>> > > > updated without bringing the application down - just a > > > > > volatile > > > > > > >>>> read of > > > > > > >>>> > > > some number set externally. > > > > > > >>>> > > > int moddedPartition = partition % > maxPartitionsToWrite. > > > > > > >>>> > > > // Send a message to this Kafka partition. > > > > > > >>>> > > > > > > > > > >>>> > > > Here are some interesting things I've noticed: > > > > > > >>>> > > > > > > > > > >>>> > > > i) When I start my client and it *never writes* to > more > > > than > > > > > *8 > > > > > > >>>> > > > partitions *(same > > > > > > >>>> > > > data rate but fewer partitions) - the end to end *99th > > > > latency > > > > > > is > > > > > > >>>> > 300-350 > > > > > > >>>> > > > ms*. Quite a bit of this (numbers in my previous > emails) > > > is > > > > > the > > > > > > >>>> latency > > > > > > >>>> > > > from producer -> broker and the latency from broker -> > > > > > consumer. > > > > > > >>>> Still > > > > > > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing. > > > > > > >>>> > > > > > > > > > >>>> > > > ii) When I increase the maximum number of partitions, > > end > > > to > > > > > end > > > > > > >>>> > latency > > > > > > >>>> > > > increases dramatically. At *256 partitions* the end to > > end > > > > > *99th > > > > > > >>>> > latency > > > > > > >>>> > > is > > > > > > >>>> > > > still 390 - 418 ms.* Worse than the latency figures > for > > *8 > > > > > > >>>> *partitions, > > > > > > >>>> > > but > > > > > > >>>> > > > not by much. When I increase this number to *512 > > > partitions > > > > > *the > > > > > > >>>> end > > > > > > >>>> > > > to end *99th > > > > > > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At > > *1024* > > > > > > >>>> partitions > > > > > > >>>> > the > > > > > > >>>> > > > *99th > > > > > > >>>> > > > latency is at 25 - 30 seconds*. > > > > > > >>>> > > > A table of the numbers: > > > > > > >>>> > > > > > > > > > >>>> > > > Max number of partitions written to (out of 1024) > > > > > > >>>> > > > > > > > > > >>>> > > > End to end latency > > > > > > >>>> > > > > > > > > > >>>> > > > 8 > > > > > > >>>> > > > > > > > > > >>>> > > > 300 - 350 ms > > > > > > >>>> > > > > > > > > > >>>> > > > 256 > > > > > > >>>> > > > > > > > > > >>>> > > > 390 - 418 ms > > > > > > >>>> > > > > > > > > > >>>> > > > 512 > > > > > > >>>> > > > > > > > > > >>>> > > > 19 - 24 seconds > > > > > > >>>> > > > > > > > > > >>>> > > > 1024 > > > > > > >>>> > > > > > > > > > >>>> > > > 25 - 30 seconds > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> > > > iii) Once I make the max number of partitions high > > enough, > > > > > > >>>> reducing it > > > > > > >>>> > > > doesn't help. For eg: If I go up from *8* to *512 > > > > *partitions, > > > > > > the > > > > > > >>>> > > latency > > > > > > >>>> > > > goes up. But while the producer is running if I go > down > > > from > > > > > > >>>> *512* to > > > > > > >>>> > > > *8 *partitions, > > > > > > >>>> > > > it doesn't reduce the latency numbers. My guess is > that > > > the > > > > > > >>>> producer is > > > > > > >>>> > > > creating some state lazily per partition and this > state > > is > > > > > > >>>> causing the > > > > > > >>>> > > > latency. Once this state is created, writing to fewer > > > > > partitions > > > > > > >>>> > doesn't > > > > > > >>>> > > > seem to help. Only a restart of the producer calms > > things > > > > > down. > > > > > > >>>> > > > > > > > > > >>>> > > > So my current plan is to reduce the number of > partitions > > > on > > > > > the > > > > > > >>>> topic, > > > > > > >>>> > > but > > > > > > >>>> > > > there seems to be something deeper going on for the > > > latency > > > > > > >>>> numbers to > > > > > > >>>> > be > > > > > > >>>> > > > so poor to begin with and then suffer so much more > (non > > > > > > linearly) > > > > > > >>>> with > > > > > > >>>> > > > additional partitions. > > > > > > >>>> > > > > > > > > > >>>> > > > Thanks! > > > > > > >>>> > > > > > > > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian < > > > > > > >>>> ra...@signalfuse.com> > > > > > > >>>> > > > wrote: > > > > > > >>>> > > > > > > > > > > >>>> > > > > I've done some more measurements. I've also started > > > > > measuring > > > > > > >>>> the > > > > > > >>>> > > latency > > > > > > >>>> > > > > between when I ask my producer to send a message and > > > when > > > > I > > > > > > get > > > > > > >>>> an > > > > > > >>>> > > > > acknowledgement via the callback. Here is my code: > > > > > > >>>> > > > > > > > > > > >>>> > > > > // This function is called on every producer once > > every > > > 30 > > > > > > >>>> seconds. > > > > > > >>>> > > > > > > > > > > >>>> > > > > public void addLagMarkers(final Histogram > enqueueLag) > > { > > > > > > >>>> > > > > > > > > > > >>>> > > > > final int numberOfPartitions = 1024; > > > > > > >>>> > > > > > > > > > > >>>> > > > > final long timeOfEnqueue = > > > > > System.currentTimeMillis(); > > > > > > >>>> > > > > > > > > > > >>>> > > > > final Callback callback = new Callback() { > > > > > > >>>> > > > > > > > > > > >>>> > > > > @Override > > > > > > >>>> > > > > > > > > > > >>>> > > > > public void onCompletion(RecordMetadata > > > > > metadata, > > > > > > >>>> > Exception > > > > > > >>>> > > > ex) > > > > > > >>>> > > > > { > > > > > > >>>> > > > > > > > > > > >>>> > > > > if (metadata != null) { > > > > > > >>>> > > > > > > > > > > >>>> > > > > // The difference between ack > time > > > > from > > > > > > >>>> broker > > > > > > >>>> > and > > > > > > >>>> > > > > enqueue time. > > > > > > >>>> > > > > > > > > > > >>>> > > > > final long timeOfAck = > > > > > > >>>> > System.currentTimeMillis(); > > > > > > >>>> > > > > > > > > > > >>>> > > > > final long lag = timeOfAck - > > > > > > timeOfEnqueue; > > > > > > >>>> > > > > > > > > > > >>>> > > > > enqueueLag.update(lag); > > > > > > >>>> > > > > > > > > > > >>>> > > > > } > > > > > > >>>> > > > > > > > > > > >>>> > > > > } > > > > > > >>>> > > > > > > > > > > >>>> > > > > }; > > > > > > >>>> > > > > > > > > > > >>>> > > > > for (int i = 0; i < numberOfPartitions; > i++) { > > > > > > >>>> > > > > > > > > > > >>>> > > > > try { > > > > > > >>>> > > > > > > > > > > >>>> > > > > byte[] value = > > > > > > >>>> LagMarker.serialize(timeOfEnqueue); > > > > > > >>>> > // > > > > > > >>>> > > 10 > > > > > > >>>> > > > > bytes -> short version + long timestamp. > > > > > > >>>> > > > > > > > > > > >>>> > > > > // This message is later used by the > > > > > consumers > > > > > > >>>> to > > > > > > >>>> > > measure > > > > > > >>>> > > > > lag. > > > > > > >>>> > > > > > > > > > > >>>> > > > > ProducerRecord record = new > > > > > > >>>> ProducerRecord(MY_TOPIC, > > > > > > >>>> > i, > > > > > > >>>> > > > > null, value); > > > > > > >>>> > > > > > > > > > > >>>> > > > > kafkaProducer.send(record, > callback); > > > > > > >>>> > > > > > > > > > > >>>> > > > > } catch (Exception e) { > > > > > > >>>> > > > > > > > > > > >>>> > > > > // We just dropped a lag marker. > > > > > > >>>> > > > > > > > > > > >>>> > > > > } > > > > > > >>>> > > > > > > > > > > >>>> > > > > } > > > > > > >>>> > > > > > > > > > > >>>> > > > > } > > > > > > >>>> > > > > > > > > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's > > not > > > > > > >>>> stellar, but > > > > > > >>>> > > > > doesn't account for the *20-30 second 99th* I see on > > the > > > > end > > > > > > to > > > > > > >>>> end > > > > > > >>>> > > lag. > > > > > > >>>> > > > > I am consuming in a tight loop on the Consumers > (using > > > the > > > > > > >>>> > > > SimpleConsumer) > > > > > > >>>> > > > > with minimal processing with a *99th fetch time *of > > > > > *130-140* > > > > > > >>>> ms, so > > > > > > >>>> > I > > > > > > >>>> > > > > don't think that should be a problem either. > > Completely > > > > > > baffled. > > > > > > >>>> > > > > > > > > > > >>>> > > > > > > > > > > >>>> > > > > Thanks! > > > > > > >>>> > > > > > > > > > > >>>> > > > > > > > > > > >>>> > > > > > > > > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian < > > > > > > >>>> ra...@signalfuse.com> > > > > > > >>>> > > > > wrote: > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian < > > > > > > >>>> ra...@signalfuse.com > > > > > > >>>> > > > > > > > > >>>> > > > >> wrote: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> I am trying to replace a Thrift peer to peer API > > with > > > > > kafka > > > > > > >>>> for a > > > > > > >>>> > > > >>> particular work flow. I am finding the 99th > > percentile > > > > > > >>>> latency to > > > > > > >>>> > be > > > > > > >>>> > > > >>> unacceptable at this time. This entire work load > > runs > > > in > > > > > an > > > > > > >>>> Amazon > > > > > > >>>> > > > VPC. I'd > > > > > > >>>> > > > >>> greatly appreciate it if some one has any insights > > on > > > > why > > > > > I > > > > > > am > > > > > > >>>> > seeing > > > > > > >>>> > > > such > > > > > > >>>> > > > >>> poor numbers. Here are some details and > measurements > > > > > taken: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> i) I have a single topic with 1024 partitions > that I > > > am > > > > > > >>>> writing to > > > > > > >>>> > > from > > > > > > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka > > producer. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> ii) I have 3 brokers, each on a c3 2x machine on > > ec2. > > > > > Each > > > > > > of > > > > > > >>>> > those > > > > > > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * > 80 > > > GB > > > > > > SSDs. > > > > > > >>>> The > > > > > > >>>> > > > broker -> > > > > > > >>>> > > > >>> partitions mapping was decided by Kafka when I > > created > > > > the > > > > > > >>>> topic. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> iii) I write about 22 thousand messages per second > > > from > > > > > > >>>> across the > > > > > > >>>> > 6 > > > > > > >>>> > > > >>> clients. This number was calculated using > > distributed > > > > > > >>>> counters. I > > > > > > >>>> > > just > > > > > > >>>> > > > >>> increment a distributed counter in the callback > from > > > my > > > > > > >>>> enqueue job > > > > > > >>>> > > if > > > > > > >>>> > > > the > > > > > > >>>> > > > >>> metadata returned is not null. I also increment a > > > > number o > > > > > > >>>> dropped > > > > > > >>>> > > > messages > > > > > > >>>> > > > >>> counter if the callback has a non-null exception > or > > if > > > > > there > > > > > > >>>> was an > > > > > > >>>> > > > >>> exception in the synchronous send call. The number > > of > > > > > > dropped > > > > > > >>>> > > messages > > > > > > >>>> > > > is > > > > > > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 > are > > > > > > >>>> responsible for > > > > > > >>>> > > > 95% of > > > > > > >>>> > > > >>> the traffic. Messages are very tiny and have null > > keys > > > > > and > > > > > > >>>> 27 byte > > > > > > >>>> > > > values > > > > > > >>>> > > > >>> (2 byte version and 25 byte payload). Again these > > > > messages > > > > > > are > > > > > > >>>> > > written > > > > > > >>>> > > > >>> using the kafka 0.8.2 client. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> iv) I have 3 consumer processes consuming only > from > > > this > > > > > > >>>> topic. > > > > > > >>>> > Each > > > > > > >>>> > > > >>> consumer process is assigned a disjoint set of the > > > 1024 > > > > > > >>>> partitions > > > > > > >>>> > by > > > > > > >>>> > > > an > > > > > > >>>> > > > >>> eternal arbiter. Each consumer process then > creates > > a > > > > > > mapping > > > > > > >>>> from > > > > > > >>>> > > > brokers > > > > > > >>>> > > > >>> -> partitions it has been assigned. It then starts > > one > > > > > > fetcher > > > > > > >>>> > thread > > > > > > >>>> > > > per > > > > > > >>>> > > > >>> broker. Each thread queries the broker (using the > > > > > > >>>> SimpleConsumer) > > > > > > >>>> > it > > > > > > >>>> > > > has > > > > > > >>>> > > > >>> been assigned for partitions such that partitions > = > > > > > > >>>> (partitions on > > > > > > >>>> > > > broker) > > > > > > >>>> > > > >>> *∩ *(partitions assigned to the process by the > > > arbiter). > > > > > So > > > > > > in > > > > > > >>>> > effect > > > > > > >>>> > > > >>> there are 9 consumer threads across 3 consumer > > > processes > > > > > > that > > > > > > >>>> > query a > > > > > > >>>> > > > >>> disjoint set of partitions for the single topic > and > > > > > amongst > > > > > > >>>> > > themselves > > > > > > >>>> > > > they > > > > > > >>>> > > > >>> manage to consume from every topic. So each thread > > > has a > > > > > run > > > > > > >>>> loop > > > > > > >>>> > > > where it > > > > > > >>>> > > > >>> asks for messages, consumes them then asks for > more > > > > > > messages. > > > > > > >>>> When > > > > > > >>>> > a > > > > > > >>>> > > > run > > > > > > >>>> > > > >>> loop starts, it starts by querying for the latest > > > > message > > > > > > in a > > > > > > >>>> > > > partition > > > > > > >>>> > > > >>> (i.e. discards any previous backup) and then > > > maintains a > > > > > map > > > > > > >>>> of > > > > > > >>>> > > > partition > > > > > > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that > > it > > > > > > consumes > > > > > > >>>> > > > messages in > > > > > > >>>> > > > >>> order. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >> Edit: Mean't external arbiter. > > > > > > >>>> > > > >> > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> v) Consumption is really simple. Each message is > put > > > on > > > > a > > > > > > non > > > > > > >>>> > > blocking > > > > > > >>>> > > > >>> efficient ring buffer. If the ring buffer is full > > the > > > > > > message > > > > > > >>>> is > > > > > > >>>> > > > dropped. I > > > > > > >>>> > > > >>> measure the mean time between fetches and it is > the > > > same > > > > > as > > > > > > >>>> the > > > > > > >>>> > time > > > > > > >>>> > > > for > > > > > > >>>> > > > >>> fetches up to a ms, meaning no matter how many > > > messages > > > > > are > > > > > > >>>> > dequeued, > > > > > > >>>> > > > it > > > > > > >>>> > > > >>> takes almost no time to process them. At the end > of > > > > > > >>>> processing a > > > > > > >>>> > > fetch > > > > > > >>>> > > > >>> request I increment another distributed counter > that > > > > > counts > > > > > > >>>> the > > > > > > >>>> > > number > > > > > > >>>> > > > of > > > > > > >>>> > > > >>> messages processed. This counter tells me that on > > > > average > > > > > I > > > > > > am > > > > > > >>>> > > > consuming > > > > > > >>>> > > > >>> the same number of messages/sec that I enqueue on > > the > > > > > > >>>> producers > > > > > > >>>> > i.e. > > > > > > >>>> > > > around > > > > > > >>>> > > > >>> 22 thousand messages/sec. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> vi) The 99th percentile of the number of messages > > > > fetched > > > > > > per > > > > > > >>>> fetch > > > > > > >>>> > > > >>> request is about 500 messages. The 99th > percentile > > of > > > > the > > > > > > >>>> time it > > > > > > >>>> > > > takes to > > > > > > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played > around > > > with > > > > > the > > > > > > >>>> buffer > > > > > > >>>> > > and > > > > > > >>>> > > > >>> maxWait settings on the SimpleConsumer and > > attempting > > > to > > > > > > >>>> consume > > > > > > >>>> > more > > > > > > >>>> > > > >>> messages was leading the 99th percentile of the > > fetch > > > > time > > > > > > to > > > > > > >>>> > balloon > > > > > > >>>> > > > up, > > > > > > >>>> > > > >>> so I am consuming in smaller batches right now. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> vii) Every 30 seconds odd each of the producers > > > inserts > > > > a > > > > > > >>>> trace > > > > > > >>>> > > message > > > > > > >>>> > > > >>> into each partition (so 1024 messages per producer > > > every > > > > > 30 > > > > > > >>>> > seconds). > > > > > > >>>> > > > Each > > > > > > >>>> > > > >>> message only contains the > System.currentTimeMillis() > > > at > > > > > the > > > > > > >>>> time of > > > > > > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer > > in > > > > step > > > > > > (v) > > > > > > >>>> > always > > > > > > >>>> > > > >>> checks to see if a message it dequeued is of this > > > trace > > > > > type > > > > > > >>>> > (instead > > > > > > >>>> > > > of > > > > > > >>>> > > > >>> the regular message type). This is a simple check > on > > > the > > > > > > >>>> first 2 > > > > > > >>>> > byte > > > > > > >>>> > > > >>> version that each message buffer contains. If it > is > > a > > > > > trace > > > > > > >>>> message > > > > > > >>>> > > it > > > > > > >>>> > > > >>> reads it's payload as a long and uses the > difference > > > > > between > > > > > > >>>> the > > > > > > >>>> > > system > > > > > > >>>> > > > >>> time on the consumer and this payload and updates > a > > > > > > histogram > > > > > > >>>> with > > > > > > >>>> > > this > > > > > > >>>> > > > >>> difference value. Ignoring NTP skew (which I've > > > measured > > > > > to > > > > > > >>>> be in > > > > > > >>>> > the > > > > > > >>>> > > > order > > > > > > >>>> > > > >>> of milliseconds) this is the lag between when a > > > message > > > > > was > > > > > > >>>> > enqueued > > > > > > >>>> > > > on the > > > > > > >>>> > > > >>> producer and when it was read on the consumer. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte > > version > > > + > > > > 8 > > > > > > >>>> byte long > > > > > > >>>> > > for > > > > > > >>>> > > > >> timestamp. > > > > > > >>>> > > > >> > > > > > > >>>> > > > >> So pseudocode for every consumer thread (one per > > broker > > > > per > > > > > > >>>> consumer > > > > > > >>>> > > > >>> process (9 in total across 3 consumers) is: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> void run() { > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> while (running) { > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> FetchRequest fetchRequest = buildFetchRequest( > > > > > > >>>> > > > >>> > > partitionsAssignedToThisProcessThatFallOnThisBroker); > > > > // > > > > > > This > > > > > > >>>> > > > >>> assignment is done by an external arbiter. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> measureTimeBetweenFetchRequests(); // The > 99th > > > of > > > > > this > > > > > > >>>> is > > > > > > >>>> > > > 130-140ms > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> FetchResponse fetchResponse = > > > > > > >>>> fetchResponse(fetchRequest); // > > > > > > >>>> > > The > > > > > > >>>> > > > >>> 99th of this is 130-140ms, which is the same as > the > > > time > > > > > > >>>> between > > > > > > >>>> > > > fetches. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> processData(fetchResponse); // The 99th on > this > > > is > > > > > 2-3 > > > > > > >>>> ms. > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> void processData(FetchResponse response) { > > > > > > >>>> > > > >>> try (Timer.Context _ = > > > processWorkerDataTimer.time() { > > > > > // > > > > > > >>>> The > > > > > > >>>> > 99th > > > > > > >>>> > > > on > > > > > > >>>> > > > >>> this is 2-3 ms. > > > > > > >>>> > > > >>> for (Message message : response) { > > > > > > >>>> > > > >>> if (typeOf(message) == NORMAL_DATA) { > > > > > > >>>> > > > >>> enqueueOnNonBlockingRingBuffer(message); > > // > > > > > Never > > > > > > >>>> blocks, > > > > > > >>>> > > > >>> drops message if ring buffer is full. > > > > > > >>>> > > > >>> } else if (typeOf(message) == TRACE_DATA) { > > > > > > >>>> > > > >>> long timeOfEnqueue = > > geEnqueueTime(message); > > > > > > >>>> > > > >>> long currentTime = > > > System.currentTimeMillis(); > > > > > > >>>> > > > >>> long difference = currentTime - > > > timeOfEnqueue; > > > > > > >>>> > > > >>> lagHistogram.update(difference); > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> viii) So the kicker is that this lag is really > > really > > > > > high: > > > > > > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty > high > > > > and I > > > > > > >>>> could > > > > > > >>>> > even > > > > > > >>>> > > > >>> discount some of it through NTP skew. > > > > > > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is > > > > bigger > > > > > > than > > > > > > >>>> > median > > > > > > >>>> > > > >>> kind of telling us how the distribution has a big > > > tail. > > > > > > >>>> > > > >>> 99th Lag: *20 seconds*!! > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> I can't quite figure out the source of the lag. > Here > > > are > > > > > > some > > > > > > >>>> other > > > > > > >>>> > > > >>> things of note: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU > > but > > > I > > > > > see > > > > > > >>>> > > occasional > > > > > > >>>> > > > >>> spikes to more than 80% - 95%. This could probably > > be > > > > > > causing > > > > > > >>>> > issues. > > > > > > >>>> > > > I am > > > > > > >>>> > > > >>> wondering if this is down to the high number of > > > > > partitions I > > > > > > >>>> have > > > > > > >>>> > and > > > > > > >>>> > > > how > > > > > > >>>> > > > >>> each partition ends up receiving not too many > > > messages. > > > > > 22k > > > > > > >>>> > messages > > > > > > >>>> > > > spread > > > > > > >>>> > > > >>> across 1024 partitions = only 21 odd > > > > > messages/sec/partition. > > > > > > >>>> But > > > > > > >>>> > each > > > > > > >>>> > > > >>> broker should be receiving about 7500 > messages/sec. > > > It > > > > > > could > > > > > > >>>> also > > > > > > >>>> > be > > > > > > >>>> > > > down > > > > > > >>>> > > > >>> to the nature of these messages being tiny. I > > imagine > > > > that > > > > > > the > > > > > > >>>> > kafka > > > > > > >>>> > > > wire > > > > > > >>>> > > > >>> protocol is probably close to the 25 byte message > > > > payload. > > > > > > It > > > > > > >>>> also > > > > > > >>>> > > has > > > > > > >>>> > > > to > > > > > > >>>> > > > >>> do a few CRC32 calculations etc. But again given > > that > > > > > these > > > > > > >>>> are > > > > > > >>>> > > C3.2x > > > > > > >>>> > > > >>> machines and the number of messages is so tiny I'd > > > > expect > > > > > it > > > > > > >>>> to do > > > > > > >>>> > > > better. > > > > > > >>>> > > > >>> But again reading this article > > > > > > >>>> > > > >>> < > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > > > > > >>>> > > > > > > > > > >>>> > > > about > > > > > > >>>> > > > >>> 2 million messages/sec across three brokers makes > it > > > > seem > > > > > > >>>> like 22k > > > > > > >>>> > 25 > > > > > > >>>> > > > byte > > > > > > >>>> > > > >>> messages should be very easy. So my biggest > > suspicion > > > > > right > > > > > > >>>> now is > > > > > > >>>> > > > that the > > > > > > >>>> > > > >>> large number of partitions is some how responsible > > for > > > > > this > > > > > > >>>> > latency. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> 2) On the consumer given that the 99th time > between > > > > > fetches > > > > > > >>>> == 99th > > > > > > >>>> > > > time > > > > > > >>>> > > > >>> of a fetch, I don't see how I could do any better. > > > It's > > > > > > pretty > > > > > > >>>> > close > > > > > > >>>> > > to > > > > > > >>>> > > > >>> just getting batches of data, iterating through > them > > > and > > > > > > >>>> dropping > > > > > > >>>> > > them > > > > > > >>>> > > > on > > > > > > >>>> > > > >>> the floor. My requests on the consumer are built > > like > > > > > this: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> FetchRequestBuilder builder = new > > > > > > >>>> > > > >>> FetchRequestBuilder().clientId(clientName) > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> .maxWait(100).minBytes(1000); // > > max > > > > wait > > > > > > of > > > > > > >>>> 100 > > > > > > >>>> > ms > > > > > > >>>> > > or > > > > > > >>>> > > > >>> at least 1000 bytes whichever happens first. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> // Add a fetch request for every > partition. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> for (PartitionMetadata partition : > > > partitions) { > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> int partitionId = > > partition.partitionId(); > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> long offset = > > readOffsets.get(partition); > > > > // > > > > > > >>>> This map > > > > > > >>>> > > > >>> maintains the next partition to be read. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> builder.addFetch(MY_TOPIC, > partitionId, > > > > > offset, > > > > > > >>>> 3000); > > > > > > >>>> > > // > > > > > > >>>> > > > >>> Up to 3000 bytes per fetch request.} > > > > > > >>>> > > > >>> } > > > > > > >>>> > > > >>> 3) On the producer side I use the kafka beta > > producer. > > > > > I've > > > > > > >>>> played > > > > > > >>>> > > > >>> around with many settings but none of them seem to > > > have > > > > > > made a > > > > > > >>>> > > > difference. > > > > > > >>>> > > > >>> Here are my current settings: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> ProducerConfig values: > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> block.on.buffer.full = false > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> retry.backoff.ms = 100 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> buffer.memory = 83886080 // Kind of big - > > > could > > > > > it > > > > > > be > > > > > > >>>> > adding > > > > > > >>>> > > > >>> lag? > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> batch.size = 40500 // This is also kind > of > > > big > > > > - > > > > > > >>>> could it > > > > > > >>>> > be > > > > > > >>>> > > > >>> adding lag? > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> metrics.sample.window.ms = 30000 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> metadata.max.age.ms = 300000 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> receive.buffer.bytes = 32768 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> timeout.ms = 30000 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> max.in.flight.requests.per.connection = 5 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> metric.reporters = [] > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> bootstrap.servers = [broker1, broker2, > > > broker3] > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> client.id = > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> compression.type = none > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> retries = 0 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> max.request.size = 1048576 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> send.buffer.bytes = 131072 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> acks = 1 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> reconnect.backoff.ms = 10 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> linger.ms = 250 // Based on this config > > I'd > > > > > > imagine > > > > > > >>>> that > > > > > > >>>> > > the > > > > > > >>>> > > > >>> lag should be bounded to about 250 ms over all. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> metrics.num.samples = 2 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> metadata.fetch.timeout.ms = 60000 > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> I know this is a lot of information. I just wanted > > to > > > > > > provide > > > > > > >>>> as > > > > > > >>>> > much > > > > > > >>>> > > > >>> context as possible. > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> Thanks in advance! > > > > > > >>>> > > > >>> > > > > > > >>>> > > > >>> > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >