Never mind about (2). I see these stats are already being output by the kafka producer. I've attached a couple of screenshots (couldn't copy paste from jvisualvm ). Do any of these things strike as odd? The bufferpool-wait-ratio sadly shows up as a NaN.
I still don't know how to figure out (1). Thanks! On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Hi Jay, > > Re (1) - I am not sure how to do this? Actually I am not sure what this > means. Is this the time every write/fetch request is received on the > broker? Do I need to enable some specific log level for this to show up? It > doesn't show up in the usual log. Is this information also available via > jmx somehow? > Re (2) - Are you saying that I should instrument the "percentage of time > waiting for buffer space" stat myself? If so how do I do this. Or are these > stats already output to jmx by the kafka producer code. This seems like > it's in the internals of the kafka producer client code. > > Thanks again! > > > On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > >> Thanks Jay. Will check (1) and (2) and get back to you. The test is not >> stand-alone now. It might be a bit of work to extract it to a stand-alone >> executable. It might take me a bit of time to get that going. >> >> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote: >> >>> Hey Rajiv, >>> >>> This sounds like a bug. The more info you can help us get the easier to >>> fix. Things that would help: >>> 1. Can you check if the the request log on the servers shows latency >>> spikes >>> (in which case it is a server problem)? >>> 2. It would be worth also getting the jmx stats on the producer as they >>> will show things like what percentage of time it is waiting for buffer >>> space etc. >>> >>> If your test is reasonably stand-alone it would be great to file a JIRA >>> and >>> attach the test code and the findings you already have so someone can dig >>> into what is going on. >>> >>> -Jay >>> >>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com> >>> wrote: >>> >>> > Hi all, >>> > >>> > Bumping this up, in case some one has any ideas. I did yet another >>> > experiment where I create 4 producers and stripe the send requests >>> across >>> > them in a manner such that any one producer only sees 256 partitions >>> > instead of the entire 1024. This seems to have helped a bit, and >>> though I >>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th and >>> 95th >>> > percentile have all gone down. >>> > >>> > Thanks! >>> > >>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges < >>> tstump...@ntent.com> >>> > wrote: >>> > >>> > > Ah I thought it was restarting the broker that made things better :) >>> > > >>> > > Yeah I have no experience with the Java client so can't really help >>> > there. >>> > > >>> > > Good luck! >>> > > >>> > > -----Original Message----- >>> > > From: Rajiv Kurian [ra...@signalfuse.com] >>> > > Received: Sunday, 21 Dec 2014, 12:25PM >>> > > To: users@kafka.apache.org [users@kafka.apache.org] >>> > > Subject: Re: Trying to figure out kafka latency issues >>> > > >>> > > I'll take a look at the GC profile of the brokers Right now I keep a >>> tab >>> > on >>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the >>> machine >>> > not >>> > > JVM heap) free disk space on the broker. I'll need to take a look at >>> the >>> > > JVM metrics too. What seemed strange is that going from 8 -> 512 >>> > partitions >>> > > increases the latency, but going fro 512-> 8 does not decrease it. I >>> have >>> > > to restart the producer (but not the broker) for the end to end >>> latency >>> > to >>> > > go down That made it seem that the fault was probably with the >>> producer >>> > > and not the broker. Only restarting the producer made things better. >>> I'll >>> > > do more extensive measurement on the broker. >>> > > >>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges < >>> tstump...@ntent.com> >>> > > wrote: >>> > > > >>> > > > Did you see my response and have you checked the server logs >>> especially >>> > > > the GC logs? It still sounds like you are running out of memory on >>> the >>> > > > broker. What is your max heap memory and are you thrashing once you >>> > start >>> > > > writing to all those partitions? >>> > > > >>> > > > You have measured very thoroughly from an external point of view, i >>> > think >>> > > > now you'll have to start measuring the internal metrics. Maybe >>> someone >>> > > else >>> > > > will have ideas on what jmx values to watch. >>> > > > >>> > > > Best, >>> > > > Thunder >>> > > > >>> > > > >>> > > > -----Original Message----- >>> > > > From: Rajiv Kurian [ra...@signalfuse.com] >>> > > > Received: Saturday, 20 Dec 2014, 10:24PM >>> > > > To: users@kafka.apache.org [users@kafka.apache.org] >>> > > > Subject: Re: Trying to figure out kafka latency issues >>> > > > >>> > > > Some more work tells me that the end to end latency numbers vary >>> with >>> > the >>> > > > number of partitions I am writing to. I did an experiment, where >>> based >>> > > on a >>> > > > run time flag I would dynamically select how many of the *1024 >>> > > partitions* >>> > > > I write to. So say I decide I'll write to at most 256 partitions I >>> mod >>> > > > whatever partition I would actually write to by 256. Basically the >>> > number >>> > > > of partitions for this topic on the broker remains the same at >>> *1024* >>> > > > partitions but the number of partitions my producers write to >>> changes >>> > > > dynamically based on a run time flag. So something like this: >>> > > > >>> > > > int partition = getPartitionForMessage(message); >>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This >>> flag >>> > can >>> > > be >>> > > > updated without bringing the application down - just a volatile >>> read of >>> > > > some number set externally. >>> > > > int moddedPartition = partition % maxPartitionsToWrite. >>> > > > // Send a message to this Kafka partition. >>> > > > >>> > > > Here are some interesting things I've noticed: >>> > > > >>> > > > i) When I start my client and it *never writes* to more than *8 >>> > > > partitions *(same >>> > > > data rate but fewer partitions) - the end to end *99th latency is >>> > 300-350 >>> > > > ms*. Quite a bit of this (numbers in my previous emails) is the >>> latency >>> > > > from producer -> broker and the latency from broker -> consumer. >>> Still >>> > > > nowhere as poor as the *20 - 30* seconds I was seeing. >>> > > > >>> > > > ii) When I increase the maximum number of partitions, end to end >>> > latency >>> > > > increases dramatically. At *256 partitions* the end to end *99th >>> > latency >>> > > is >>> > > > still 390 - 418 ms.* Worse than the latency figures for *8 >>> *partitions, >>> > > but >>> > > > not by much. When I increase this number to *512 partitions *the >>> end >>> > > > to end *99th >>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024* >>> partitions >>> > the >>> > > > *99th >>> > > > latency is at 25 - 30 seconds*. >>> > > > A table of the numbers: >>> > > > >>> > > > Max number of partitions written to (out of 1024) >>> > > > >>> > > > End to end latency >>> > > > >>> > > > 8 >>> > > > >>> > > > 300 - 350 ms >>> > > > >>> > > > 256 >>> > > > >>> > > > 390 - 418 ms >>> > > > >>> > > > 512 >>> > > > >>> > > > 19 - 24 seconds >>> > > > >>> > > > 1024 >>> > > > >>> > > > 25 - 30 seconds >>> > > > >>> > > > >>> > > > iii) Once I make the max number of partitions high enough, >>> reducing it >>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the >>> > > latency >>> > > > goes up. But while the producer is running if I go down from >>> *512* to >>> > > > *8 *partitions, >>> > > > it doesn't reduce the latency numbers. My guess is that the >>> producer is >>> > > > creating some state lazily per partition and this state is causing >>> the >>> > > > latency. Once this state is created, writing to fewer partitions >>> > doesn't >>> > > > seem to help. Only a restart of the producer calms things down. >>> > > > >>> > > > So my current plan is to reduce the number of partitions on the >>> topic, >>> > > but >>> > > > there seems to be something deeper going on for the latency >>> numbers to >>> > be >>> > > > so poor to begin with and then suffer so much more (non linearly) >>> with >>> > > > additional partitions. >>> > > > >>> > > > Thanks! >>> > > > >>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian < >>> ra...@signalfuse.com> >>> > > > wrote: >>> > > > > >>> > > > > I've done some more measurements. I've also started measuring the >>> > > latency >>> > > > > between when I ask my producer to send a message and when I get >>> an >>> > > > > acknowledgement via the callback. Here is my code: >>> > > > > >>> > > > > // This function is called on every producer once every 30 >>> seconds. >>> > > > > >>> > > > > public void addLagMarkers(final Histogram enqueueLag) { >>> > > > > >>> > > > > final int numberOfPartitions = 1024; >>> > > > > >>> > > > > final long timeOfEnqueue = System.currentTimeMillis(); >>> > > > > >>> > > > > final Callback callback = new Callback() { >>> > > > > >>> > > > > @Override >>> > > > > >>> > > > > public void onCompletion(RecordMetadata metadata, >>> > Exception >>> > > > ex) >>> > > > > { >>> > > > > >>> > > > > if (metadata != null) { >>> > > > > >>> > > > > // The difference between ack time from >>> broker >>> > and >>> > > > > enqueue time. >>> > > > > >>> > > > > final long timeOfAck = >>> > System.currentTimeMillis(); >>> > > > > >>> > > > > final long lag = timeOfAck - timeOfEnqueue; >>> > > > > >>> > > > > enqueueLag.update(lag); >>> > > > > >>> > > > > } >>> > > > > >>> > > > > } >>> > > > > >>> > > > > }; >>> > > > > >>> > > > > for (int i = 0; i < numberOfPartitions; i++) { >>> > > > > >>> > > > > try { >>> > > > > >>> > > > > byte[] value = >>> LagMarker.serialize(timeOfEnqueue); >>> > // >>> > > 10 >>> > > > > bytes -> short version + long timestamp. >>> > > > > >>> > > > > // This message is later used by the consumers to >>> > > measure >>> > > > > lag. >>> > > > > >>> > > > > ProducerRecord record = new >>> ProducerRecord(MY_TOPIC, >>> > i, >>> > > > > null, value); >>> > > > > >>> > > > > kafkaProducer.send(record, callback); >>> > > > > >>> > > > > } catch (Exception e) { >>> > > > > >>> > > > > // We just dropped a lag marker. >>> > > > > >>> > > > > } >>> > > > > >>> > > > > } >>> > > > > >>> > > > > } >>> > > > > >>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not >>> stellar, but >>> > > > > doesn't account for the *20-30 second 99th* I see on the end to >>> end >>> > > lag. >>> > > > > I am consuming in a tight loop on the Consumers (using the >>> > > > SimpleConsumer) >>> > > > > with minimal processing with a *99th fetch time *of *130-140* >>> ms, so >>> > I >>> > > > > don't think that should be a problem either. Completely baffled. >>> > > > > >>> > > > > >>> > > > > Thanks! >>> > > > > >>> > > > > >>> > > > > >>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian < >>> ra...@signalfuse.com> >>> > > > > wrote: >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian < >>> ra...@signalfuse.com >>> > > >>> > > > >> wrote: >>> > > > >>> >>> > > > >>> I am trying to replace a Thrift peer to peer API with kafka >>> for a >>> > > > >>> particular work flow. I am finding the 99th percentile latency >>> to >>> > be >>> > > > >>> unacceptable at this time. This entire work load runs in an >>> Amazon >>> > > > VPC. I'd >>> > > > >>> greatly appreciate it if some one has any insights on why I am >>> > seeing >>> > > > such >>> > > > >>> poor numbers. Here are some details and measurements taken: >>> > > > >>> >>> > > > >>> i) I have a single topic with 1024 partitions that I am >>> writing to >>> > > from >>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer. >>> > > > >>> >>> > > > >>> ii) I have 3 brokers, each on a c3 2x machine on ec2. Each of >>> > those >>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. >>> The >>> > > > broker -> >>> > > > >>> partitions mapping was decided by Kafka when I created the >>> topic. >>> > > > >>> >>> > > > >>> iii) I write about 22 thousand messages per second from across >>> the >>> > 6 >>> > > > >>> clients. This number was calculated using distributed >>> counters. I >>> > > just >>> > > > >>> increment a distributed counter in the callback from my >>> enqueue job >>> > > if >>> > > > the >>> > > > >>> metadata returned is not null. I also increment a number o >>> dropped >>> > > > messages >>> > > > >>> counter if the callback has a non-null exception or if there >>> was an >>> > > > >>> exception in the synchronous send call. The number of dropped >>> > > messages >>> > > > is >>> > > > >>> pretty much always zero. Out of the 6 clients 3 are >>> responsible for >>> > > > 95% of >>> > > > >>> the traffic. Messages are very tiny and have null keys and 27 >>> byte >>> > > > values >>> > > > >>> (2 byte version and 25 byte payload). Again these messages are >>> > > written >>> > > > >>> using the kafka 0.8.2 client. >>> > > > >>> >>> > > > >>> iv) I have 3 consumer processes consuming only from this topic. >>> > Each >>> > > > >>> consumer process is assigned a disjoint set of the 1024 >>> partitions >>> > by >>> > > > an >>> > > > >>> eternal arbiter. Each consumer process then creates a mapping >>> from >>> > > > brokers >>> > > > >>> -> partitions it has been assigned. It then starts one fetcher >>> > thread >>> > > > per >>> > > > >>> broker. Each thread queries the broker (using the >>> SimpleConsumer) >>> > it >>> > > > has >>> > > > >>> been assigned for partitions such that partitions = >>> (partitions on >>> > > > broker) >>> > > > >>> *∩ *(partitions assigned to the process by the arbiter). So in >>> > effect >>> > > > >>> there are 9 consumer threads across 3 consumer processes that >>> > query a >>> > > > >>> disjoint set of partitions for the single topic and amongst >>> > > themselves >>> > > > they >>> > > > >>> manage to consume from every topic. So each thread has a run >>> loop >>> > > > where it >>> > > > >>> asks for messages, consumes them then asks for more messages. >>> When >>> > a >>> > > > run >>> > > > >>> loop starts, it starts by querying for the latest message in a >>> > > > partition >>> > > > >>> (i.e. discards any previous backup) and then maintains a map of >>> > > > partition >>> > > > >>> -> nextOffsetToRequest in memory to make sure that it consumes >>> > > > messages in >>> > > > >>> order. >>> > > > >>> >>> > > > >> Edit: Mean't external arbiter. >>> > > > >> >>> > > > >>> >>> > > > >>> v) Consumption is really simple. Each message is put on a non >>> > > blocking >>> > > > >>> efficient ring buffer. If the ring buffer is full the message >>> is >>> > > > dropped. I >>> > > > >>> measure the mean time between fetches and it is the same as the >>> > time >>> > > > for >>> > > > >>> fetches up to a ms, meaning no matter how many messages are >>> > dequeued, >>> > > > it >>> > > > >>> takes almost no time to process them. At the end of processing >>> a >>> > > fetch >>> > > > >>> request I increment another distributed counter that counts the >>> > > number >>> > > > of >>> > > > >>> messages processed. This counter tells me that on average I am >>> > > > consuming >>> > > > >>> the same number of messages/sec that I enqueue on the producers >>> > i.e. >>> > > > around >>> > > > >>> 22 thousand messages/sec. >>> > > > >>> >>> > > > >>> vi) The 99th percentile of the number of messages fetched per >>> fetch >>> > > > >>> request is about 500 messages. The 99th percentile of the >>> time it >>> > > > takes to >>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with the >>> buffer >>> > > and >>> > > > >>> maxWait settings on the SimpleConsumer and attempting to >>> consume >>> > more >>> > > > >>> messages was leading the 99th percentile of the fetch time to >>> > balloon >>> > > > up, >>> > > > >>> so I am consuming in smaller batches right now. >>> > > > >>> >>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a trace >>> > > message >>> > > > >>> into each partition (so 1024 messages per producer every 30 >>> > seconds). >>> > > > Each >>> > > > >>> message only contains the System.currentTimeMillis() at the >>> time of >>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v) >>> > always >>> > > > >>> checks to see if a message it dequeued is of this trace type >>> > (instead >>> > > > of >>> > > > >>> the regular message type). This is a simple check on the first >>> 2 >>> > byte >>> > > > >>> version that each message buffer contains. If it is a trace >>> message >>> > > it >>> > > > >>> reads it's payload as a long and uses the difference between >>> the >>> > > system >>> > > > >>> time on the consumer and this payload and updates a histogram >>> with >>> > > this >>> > > > >>> difference value. Ignoring NTP skew (which I've measured to be >>> in >>> > the >>> > > > order >>> > > > >>> of milliseconds) this is the lag between when a message was >>> > enqueued >>> > > > on the >>> > > > >>> producer and when it was read on the consumer. >>> > > > >>> >>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte >>> long >>> > > for >>> > > > >> timestamp. >>> > > > >> >>> > > > >> So pseudocode for every consumer thread (one per broker per >>> consumer >>> > > > >>> process (9 in total across 3 consumers) is: >>> > > > >>> >>> > > > >>> void run() { >>> > > > >>> >>> > > > >>> while (running) { >>> > > > >>> >>> > > > >>> FetchRequest fetchRequest = buildFetchRequest( >>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker); // This >>> > > > >>> assignment is done by an external arbiter. >>> > > > >>> >>> > > > >>> measureTimeBetweenFetchRequests(); // The 99th of this is >>> > > > 130-140ms >>> > > > >>> >>> > > > >>> FetchResponse fetchResponse = >>> fetchResponse(fetchRequest); // >>> > > The >>> > > > >>> 99th of this is 130-140ms, which is the same as the time >>> between >>> > > > fetches. >>> > > > >>> >>> > > > >>> processData(fetchResponse); // The 99th on this is 2-3 ms. >>> > > > >>> } >>> > > > >>> } >>> > > > >>> >>> > > > >>> void processData(FetchResponse response) { >>> > > > >>> try (Timer.Context _ = processWorkerDataTimer.time() { // >>> The >>> > 99th >>> > > > on >>> > > > >>> this is 2-3 ms. >>> > > > >>> for (Message message : response) { >>> > > > >>> if (typeOf(message) == NORMAL_DATA) { >>> > > > >>> enqueueOnNonBlockingRingBuffer(message); // Never >>> blocks, >>> > > > >>> drops message if ring buffer is full. >>> > > > >>> } else if (typeOf(message) == TRACE_DATA) { >>> > > > >>> long timeOfEnqueue = geEnqueueTime(message); >>> > > > >>> long currentTime = System.currentTimeMillis(); >>> > > > >>> long difference = currentTime - timeOfEnqueue; >>> > > > >>> lagHistogram.update(difference); >>> > > > >>> } >>> > > > >>> } >>> > > > >>> } >>> > > > >>> } >>> > > > >>> >>> > > > >>> viii) So the kicker is that this lag is really really high: >>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I >>> could >>> > even >>> > > > >>> discount some of it through NTP skew. >>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than >>> > median >>> > > > >>> kind of telling us how the distribution has a big tail. >>> > > > >>> 99th Lag: *20 seconds*!! >>> > > > >>> >>> > > > >>> I can't quite figure out the source of the lag. Here are some >>> other >>> > > > >>> things of note: >>> > > > >>> >>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I see >>> > > occasional >>> > > > >>> spikes to more than 80% - 95%. This could probably be causing >>> > issues. >>> > > > I am >>> > > > >>> wondering if this is down to the high number of partitions I >>> have >>> > and >>> > > > how >>> > > > >>> each partition ends up receiving not too many messages. 22k >>> > messages >>> > > > spread >>> > > > >>> across 1024 partitions = only 21 odd messages/sec/partition. >>> But >>> > each >>> > > > >>> broker should be receiving about 7500 messages/sec. It could >>> also >>> > be >>> > > > down >>> > > > >>> to the nature of these messages being tiny. I imagine that the >>> > kafka >>> > > > wire >>> > > > >>> protocol is probably close to the 25 byte message payload. It >>> also >>> > > has >>> > > > to >>> > > > >>> do a few CRC32 calculations etc. But again given that these >>> are >>> > > C3.2x >>> > > > >>> machines and the number of messages is so tiny I'd expect it >>> to do >>> > > > better. >>> > > > >>> But again reading this article >>> > > > >>> < >>> > > > >>> > > >>> > >>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines >>> > > > >>> > > > about >>> > > > >>> 2 million messages/sec across three brokers makes it seem like >>> 22k >>> > 25 >>> > > > byte >>> > > > >>> messages should be very easy. So my biggest suspicion right >>> now is >>> > > > that the >>> > > > >>> large number of partitions is some how responsible for this >>> > latency. >>> > > > >>> >>> > > > >>> 2) On the consumer given that the 99th time between fetches == >>> 99th >>> > > > time >>> > > > >>> of a fetch, I don't see how I could do any better. It's pretty >>> > close >>> > > to >>> > > > >>> just getting batches of data, iterating through them and >>> dropping >>> > > them >>> > > > on >>> > > > >>> the floor. My requests on the consumer are built like this: >>> > > > >>> >>> > > > >>> FetchRequestBuilder builder = new >>> > > > >>> FetchRequestBuilder().clientId(clientName) >>> > > > >>> >>> > > > >>> .maxWait(100).minBytes(1000); // max wait of >>> 100 >>> > ms >>> > > or >>> > > > >>> at least 1000 bytes whichever happens first. >>> > > > >>> >>> > > > >>> // Add a fetch request for every partition. >>> > > > >>> >>> > > > >>> for (PartitionMetadata partition : partitions) { >>> > > > >>> >>> > > > >>> int partitionId = partition.partitionId(); >>> > > > >>> >>> > > > >>> long offset = readOffsets.get(partition); // This >>> map >>> > > > >>> maintains the next partition to be read. >>> > > > >>> >>> > > > >>> builder.addFetch(MY_TOPIC, partitionId, offset, >>> 3000); >>> > > // >>> > > > >>> Up to 3000 bytes per fetch request.} >>> > > > >>> } >>> > > > >>> 3) On the producer side I use the kafka beta producer. I've >>> played >>> > > > >>> around with many settings but none of them seem to have made a >>> > > > difference. >>> > > > >>> Here are my current settings: >>> > > > >>> >>> > > > >>> ProducerConfig values: >>> > > > >>> >>> > > > >>> block.on.buffer.full = false >>> > > > >>> >>> > > > >>> retry.backoff.ms = 100 >>> > > > >>> >>> > > > >>> buffer.memory = 83886080 // Kind of big - could it be >>> > adding >>> > > > >>> lag? >>> > > > >>> >>> > > > >>> batch.size = 40500 // This is also kind of big - >>> could it >>> > be >>> > > > >>> adding lag? >>> > > > >>> >>> > > > >>> metrics.sample.window.ms = 30000 >>> > > > >>> >>> > > > >>> metadata.max.age.ms = 300000 >>> > > > >>> >>> > > > >>> receive.buffer.bytes = 32768 >>> > > > >>> >>> > > > >>> timeout.ms = 30000 >>> > > > >>> >>> > > > >>> max.in.flight.requests.per.connection = 5 >>> > > > >>> >>> > > > >>> metric.reporters = [] >>> > > > >>> >>> > > > >>> bootstrap.servers = [broker1, broker2, broker3] >>> > > > >>> >>> > > > >>> client.id = >>> > > > >>> >>> > > > >>> compression.type = none >>> > > > >>> >>> > > > >>> retries = 0 >>> > > > >>> >>> > > > >>> max.request.size = 1048576 >>> > > > >>> >>> > > > >>> send.buffer.bytes = 131072 >>> > > > >>> >>> > > > >>> acks = 1 >>> > > > >>> >>> > > > >>> reconnect.backoff.ms = 10 >>> > > > >>> >>> > > > >>> linger.ms = 250 // Based on this config I'd imagine >>> that >>> > > the >>> > > > >>> lag should be bounded to about 250 ms over all. >>> > > > >>> >>> > > > >>> metrics.num.samples = 2 >>> > > > >>> >>> > > > >>> metadata.fetch.timeout.ms = 60000 >>> > > > >>> >>> > > > >>> I know this is a lot of information. I just wanted to provide >>> as >>> > much >>> > > > >>> context as possible. >>> > > > >>> >>> > > > >>> Thanks in advance! >>> > > > >>> >>> > > > >>> >>> > > > >>> > > >>> > >>> >> >> >