Never mind about (2). I see these stats are already being output by the
kafka producer. I've attached a couple of screenshots (couldn't copy paste
from jvisualvm ). Do any of these things strike as odd? The
bufferpool-wait-ratio sadly shows up as a NaN.

I still don't know how to figure out (1).

Thanks!


On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Jay,
>
> Re (1) - I am not sure how to do this? Actually I am not sure what this
> means. Is this the time every write/fetch request is received on the
> broker? Do I need to enable some specific log level for this to show up? It
> doesn't show up in the usual log. Is this information also available via
> jmx somehow?
> Re (2) - Are you saying that I should instrument the "percentage of time
> waiting for buffer space" stat myself? If so how do I do this. Or are these
> stats already output to jmx by the kafka producer code. This seems like
> it's in the internals of the kafka producer client code.
>
> Thanks again!
>
>
> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
>> Thanks Jay. Will check (1) and (2) and get back to you. The test is not
>> stand-alone now. It might be a bit of work to extract it to a stand-alone
>> executable. It might take me a bit of time to get that going.
>>
>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote:
>>
>>> Hey Rajiv,
>>>
>>> This sounds like a bug. The more info you can help us get the easier to
>>> fix. Things that would help:
>>> 1. Can you check if the the request log on the servers shows latency
>>> spikes
>>> (in which case it is a server problem)?
>>> 2. It would be worth also getting the jmx stats on the producer as they
>>> will show things like what percentage of time it is waiting for buffer
>>> space etc.
>>>
>>> If your test is reasonably stand-alone it would be great to file a JIRA
>>> and
>>> attach the test code and the findings you already have so someone can dig
>>> into what is going on.
>>>
>>> -Jay
>>>
>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com>
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > Bumping this up, in case some one has any ideas. I did yet another
>>> > experiment where I create 4 producers and stripe the send requests
>>> across
>>> > them in a manner such that any one producer only sees 256 partitions
>>> > instead of the entire 1024. This seems to have helped a bit, and
>>> though I
>>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th and
>>> 95th
>>> > percentile have all gone down.
>>> >
>>> > Thanks!
>>> >
>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <
>>> tstump...@ntent.com>
>>> > wrote:
>>> >
>>> > > Ah I thought it was restarting the broker that made things better :)
>>> > >
>>> > > Yeah I have no experience with the Java client so can't really help
>>> > there.
>>> > >
>>> > > Good luck!
>>> > >
>>> > > -----Original Message-----
>>> > > From: Rajiv Kurian [ra...@signalfuse.com]
>>> > > Received: Sunday, 21 Dec 2014, 12:25PM
>>> > > To: users@kafka.apache.org [users@kafka.apache.org]
>>> > > Subject: Re: Trying to figure out kafka latency issues
>>> > >
>>> > > I'll take a look at the GC profile of the brokers Right now I keep a
>>> tab
>>> > on
>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the
>>> machine
>>> > not
>>> > > JVM heap) free disk space on the broker. I'll need to take a look at
>>> the
>>> > > JVM metrics too. What seemed strange is that going from 8 -> 512
>>> > partitions
>>> > > increases the latency, but going fro 512-> 8 does not decrease it. I
>>> have
>>> > > to restart the producer (but not the broker) for the end to end
>>> latency
>>> > to
>>> > > go down That made it seem  that the fault was probably with the
>>> producer
>>> > > and not the broker. Only restarting the producer made things better.
>>> I'll
>>> > > do more extensive measurement on the broker.
>>> > >
>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <
>>> tstump...@ntent.com>
>>> > > wrote:
>>> > > >
>>> > > > Did you see my response and have you checked the server logs
>>> especially
>>> > > > the GC logs? It still sounds like you are running out of memory on
>>> the
>>> > > > broker. What is your max heap memory and are you thrashing once you
>>> > start
>>> > > > writing to all those partitions?
>>> > > >
>>> > > > You have measured very thoroughly from an external point of view, i
>>> > think
>>> > > > now you'll have to start measuring the internal metrics. Maybe
>>> someone
>>> > > else
>>> > > > will have ideas on what jmx values to watch.
>>> > > >
>>> > > > Best,
>>> > > > Thunder
>>> > > >
>>> > > >
>>> > > > -----Original Message-----
>>> > > > From: Rajiv Kurian [ra...@signalfuse.com]
>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM
>>> > > > To: users@kafka.apache.org [users@kafka.apache.org]
>>> > > > Subject: Re: Trying to figure out kafka latency issues
>>> > > >
>>> > > > Some more work tells me that the end to end latency numbers vary
>>> with
>>> > the
>>> > > > number of partitions I am writing to. I did an experiment, where
>>> based
>>> > > on a
>>> > > > run time flag I would dynamically select how many of the *1024
>>> > > partitions*
>>> > > > I write to. So say I decide I'll write to at most 256 partitions I
>>> mod
>>> > > > whatever partition I would actually write to by 256. Basically the
>>> > number
>>> > > > of partitions for this topic on the broker remains the same at
>>> *1024*
>>> > > > partitions but the number of partitions my producers write to
>>> changes
>>> > > > dynamically based on a run time flag. So something like this:
>>> > > >
>>> > > > int partition = getPartitionForMessage(message);
>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This
>>> flag
>>> > can
>>> > > be
>>> > > > updated without bringing the application down - just a volatile
>>> read of
>>> > > > some number set externally.
>>> > > > int moddedPartition = partition % maxPartitionsToWrite.
>>> > > > // Send a message to this Kafka partition.
>>> > > >
>>> > > > Here are some interesting things I've noticed:
>>> > > >
>>> > > > i) When I start my client and it *never writes* to more than *8
>>> > > > partitions *(same
>>> > > > data rate but fewer partitions) - the end to end *99th latency is
>>> > 300-350
>>> > > > ms*. Quite a bit of this (numbers in my previous emails) is the
>>> latency
>>> > > > from producer -> broker and the latency from broker -> consumer.
>>> Still
>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
>>> > > >
>>> > > > ii) When I increase the maximum number of partitions, end to end
>>> > latency
>>> > > > increases dramatically. At *256 partitions* the end to end *99th
>>> > latency
>>> > > is
>>> > > > still 390 - 418 ms.* Worse than the latency figures for *8
>>> *partitions,
>>> > > but
>>> > > > not by much. When I increase this number to *512 partitions *the
>>> end
>>> > > > to end *99th
>>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024*
>>> partitions
>>> > the
>>> > > > *99th
>>> > > > latency is at 25 - 30 seconds*.
>>> > > > A table of the numbers:
>>> > > >
>>> > > > Max number of partitions written to (out of 1024)
>>> > > >
>>> > > > End to end latency
>>> > > >
>>> > > > 8
>>> > > >
>>> > > > 300 - 350 ms
>>> > > >
>>> > > > 256
>>> > > >
>>> > > > 390 - 418 ms
>>> > > >
>>> > > > 512
>>> > > >
>>> > > > 19 - 24 seconds
>>> > > >
>>> > > > 1024
>>> > > >
>>> > > > 25 - 30 seconds
>>> > > >
>>> > > >
>>> > > > iii) Once I make the max number of partitions high enough,
>>> reducing it
>>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the
>>> > > latency
>>> > > > goes up. But while the producer is running if I go down from
>>> *512* to
>>> > > > *8 *partitions,
>>> > > > it doesn't reduce the latency numbers. My guess is that the
>>> producer is
>>> > > > creating some state lazily per partition and this state is causing
>>> the
>>> > > > latency. Once this state is created, writing to fewer partitions
>>> > doesn't
>>> > > > seem to help. Only a restart of the producer calms things down.
>>> > > >
>>> > > > So my current plan is to reduce the number of partitions on the
>>> topic,
>>> > > but
>>> > > > there seems to be something deeper going on for the latency
>>> numbers to
>>> > be
>>> > > > so poor to begin with and then suffer so much more (non linearly)
>>> with
>>> > > > additional partitions.
>>> > > >
>>> > > > Thanks!
>>> > > >
>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <
>>> ra...@signalfuse.com>
>>> > > > wrote:
>>> > > > >
>>> > > > > I've done some more measurements. I've also started measuring the
>>> > > latency
>>> > > > > between when I ask my producer to send a message and when I get
>>> an
>>> > > > > acknowledgement via the callback. Here is my code:
>>> > > > >
>>> > > > > // This function is called on every producer once every 30
>>> seconds.
>>> > > > >
>>> > > > > public void addLagMarkers(final Histogram enqueueLag) {
>>> > > > >
>>> > > > >         final int numberOfPartitions = 1024;
>>> > > > >
>>> > > > >         final long timeOfEnqueue = System.currentTimeMillis();
>>> > > > >
>>> > > > >         final Callback callback = new Callback() {
>>> > > > >
>>> > > > >             @Override
>>> > > > >
>>> > > > >             public void onCompletion(RecordMetadata metadata,
>>> > Exception
>>> > > > ex)
>>> > > > > {
>>> > > > >
>>> > > > >                 if (metadata != null) {
>>> > > > >
>>> > > > >                     // The difference between ack time from
>>> broker
>>> > and
>>> > > > > enqueue time.
>>> > > > >
>>> > > > >                     final long timeOfAck =
>>> > System.currentTimeMillis();
>>> > > > >
>>> > > > >                     final long lag = timeOfAck - timeOfEnqueue;
>>> > > > >
>>> > > > >                     enqueueLag.update(lag);
>>> > > > >
>>> > > > >                 }
>>> > > > >
>>> > > > >             }
>>> > > > >
>>> > > > >         };
>>> > > > >
>>> > > > >         for (int i = 0; i < numberOfPartitions; i++) {
>>> > > > >
>>> > > > >             try {
>>> > > > >
>>> > > > >                 byte[] value =
>>> LagMarker.serialize(timeOfEnqueue);
>>> > //
>>> > > 10
>>> > > > > bytes -> short version + long timestamp.
>>> > > > >
>>> > > > >                 // This message is later used by the consumers to
>>> > > measure
>>> > > > > lag.
>>> > > > >
>>> > > > >                 ProducerRecord record = new
>>> ProducerRecord(MY_TOPIC,
>>> > i,
>>> > > > > null, value);
>>> > > > >
>>> > > > >                 kafkaProducer.send(record, callback);
>>> > > > >
>>> > > > >             } catch (Exception e) {
>>> > > > >
>>> > > > >                 // We just dropped a lag marker.
>>> > > > >
>>> > > > >             }
>>> > > > >
>>> > > > >         }
>>> > > > >
>>> > > > >     }
>>> > > > >
>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not
>>> stellar, but
>>> > > > > doesn't account for the *20-30 second 99th* I see on the end to
>>> end
>>> > > lag.
>>> > > > > I am consuming in a tight loop on the Consumers (using the
>>> > > > SimpleConsumer)
>>> > > > > with minimal processing with a *99th fetch time *of *130-140*
>>> ms, so
>>> > I
>>> > > > > don't think that should be a problem either. Completely baffled.
>>> > > > >
>>> > > > >
>>> > > > > Thanks!
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
>>> ra...@signalfuse.com>
>>> > > > > wrote:
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
>>> ra...@signalfuse.com
>>> > >
>>> > > > >> wrote:
>>> > > > >>>
>>> > > > >>> I am trying to replace a Thrift peer to peer API with kafka
>>> for a
>>> > > > >>> particular work flow. I am finding the 99th percentile latency
>>> to
>>> > be
>>> > > > >>> unacceptable at this time. This entire work load runs in an
>>> Amazon
>>> > > > VPC. I'd
>>> > > > >>> greatly appreciate it if some one has any insights on why I am
>>> > seeing
>>> > > > such
>>> > > > >>> poor numbers. Here are some details and measurements taken:
>>> > > > >>>
>>> > > > >>> i) I have a single topic with 1024 partitions that I am
>>> writing to
>>> > > from
>>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer.
>>> > > > >>>
>>> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of
>>> > those
>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs.
>>> The
>>> > > > broker ->
>>> > > > >>> partitions mapping was decided by Kafka when I created the
>>> topic.
>>> > > > >>>
>>> > > > >>> iii) I write about 22 thousand messages per second from across
>>> the
>>> > 6
>>> > > > >>> clients. This number was calculated using distributed
>>> counters. I
>>> > > just
>>> > > > >>> increment a distributed counter in the callback from my
>>> enqueue job
>>> > > if
>>> > > > the
>>> > > > >>> metadata returned is not null. I also increment a number o
>>> dropped
>>> > > > messages
>>> > > > >>> counter if the callback has a non-null exception or if there
>>> was an
>>> > > > >>> exception in the synchronous send call. The number of dropped
>>> > > messages
>>> > > > is
>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are
>>> responsible for
>>> > > > 95% of
>>> > > > >>> the traffic. Messages are very tiny and have null keys  and 27
>>> byte
>>> > > > values
>>> > > > >>> (2 byte version and 25 byte payload). Again these messages are
>>> > > written
>>> > > > >>> using the kafka 0.8.2 client.
>>> > > > >>>
>>> > > > >>> iv) I have 3 consumer processes consuming only from this topic.
>>> > Each
>>> > > > >>> consumer process is assigned a disjoint set of the 1024
>>> partitions
>>> > by
>>> > > > an
>>> > > > >>> eternal arbiter. Each consumer process then creates a mapping
>>> from
>>> > > > brokers
>>> > > > >>> -> partitions it has been assigned. It then starts one fetcher
>>> > thread
>>> > > > per
>>> > > > >>> broker. Each thread queries the broker (using the
>>> SimpleConsumer)
>>> > it
>>> > > > has
>>> > > > >>> been assigned for partitions such that partitions =
>>> (partitions on
>>> > > > broker)
>>> > > > >>> *∩ *(partitions assigned to the process by the arbiter). So in
>>> > effect
>>> > > > >>> there are 9 consumer threads across 3 consumer processes that
>>> > query a
>>> > > > >>> disjoint set of partitions for the single topic and amongst
>>> > > themselves
>>> > > > they
>>> > > > >>> manage to consume from every topic. So each thread has a run
>>> loop
>>> > > > where it
>>> > > > >>> asks for messages, consumes them then asks for more messages.
>>> When
>>> > a
>>> > > > run
>>> > > > >>> loop starts, it starts by querying for the latest message in a
>>> > > > partition
>>> > > > >>> (i.e. discards any previous backup) and then maintains a map of
>>> > > > partition
>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it consumes
>>> > > > messages in
>>> > > > >>> order.
>>> > > > >>>
>>> > > > >> Edit: Mean't external arbiter.
>>> > > > >>
>>> > > > >>>
>>> > > > >>> v) Consumption is really simple. Each message is put on a non
>>> > > blocking
>>> > > > >>> efficient ring buffer. If the ring buffer is full the message
>>> is
>>> > > > dropped. I
>>> > > > >>> measure the mean time between fetches and it is the same as the
>>> > time
>>> > > > for
>>> > > > >>> fetches up to a ms, meaning no matter how many messages are
>>> > dequeued,
>>> > > > it
>>> > > > >>> takes almost no time to process them. At the end of processing
>>> a
>>> > > fetch
>>> > > > >>> request I increment another distributed counter that counts the
>>> > > number
>>> > > > of
>>> > > > >>> messages processed. This counter tells me that on average I am
>>> > > > consuming
>>> > > > >>> the same number of messages/sec that I enqueue on the producers
>>> > i.e.
>>> > > > around
>>> > > > >>> 22 thousand messages/sec.
>>> > > > >>>
>>> > > > >>> vi) The 99th percentile of the number of messages fetched per
>>> fetch
>>> > > > >>> request is about 500 messages.  The 99th percentile of the
>>> time it
>>> > > > takes to
>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with the
>>> buffer
>>> > > and
>>> > > > >>> maxWait settings on the SimpleConsumer and attempting to
>>> consume
>>> > more
>>> > > > >>> messages was leading the 99th percentile of the fetch time to
>>> > balloon
>>> > > > up,
>>> > > > >>> so I am consuming in smaller batches right now.
>>> > > > >>>
>>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a trace
>>> > > message
>>> > > > >>> into each partition (so 1024 messages per producer every 30
>>> > seconds).
>>> > > > Each
>>> > > > >>> message only contains the System.currentTimeMillis() at the
>>> time of
>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v)
>>> > always
>>> > > > >>> checks to see if a message it dequeued is of this trace type
>>> > (instead
>>> > > > of
>>> > > > >>> the regular message type). This is a simple check on the first
>>> 2
>>> > byte
>>> > > > >>> version that each message buffer contains. If it is a trace
>>> message
>>> > > it
>>> > > > >>> reads it's payload as a long and uses the difference between
>>> the
>>> > > system
>>> > > > >>> time on the consumer and this payload and updates a histogram
>>> with
>>> > > this
>>> > > > >>> difference value. Ignoring NTP skew (which I've measured to be
>>> in
>>> > the
>>> > > > order
>>> > > > >>> of milliseconds) this is the lag between when a message was
>>> > enqueued
>>> > > > on the
>>> > > > >>> producer and when it was read on the consumer.
>>> > > > >>>
>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte
>>> long
>>> > > for
>>> > > > >> timestamp.
>>> > > > >>
>>> > > > >> So pseudocode for every consumer thread (one per broker per
>>> consumer
>>> > > > >>> process (9 in total across 3 consumers) is:
>>> > > > >>>
>>> > > > >>> void run() {
>>> > > > >>>
>>> > > > >>> while (running) {
>>> > > > >>>
>>> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
>>> > > > >>> assignment is done by an external arbiter.
>>> > > > >>>
>>> > > > >>>     measureTimeBetweenFetchRequests();   // The 99th of this is
>>> > > > 130-140ms
>>> > > > >>>
>>> > > > >>>     FetchResponse fetchResponse =
>>> fetchResponse(fetchRequest);  //
>>> > > The
>>> > > > >>> 99th of this is 130-140ms, which is the same as the time
>>> between
>>> > > > fetches.
>>> > > > >>>
>>> > > > >>>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
>>> > > > >>>   }
>>> > > > >>> }
>>> > > > >>>
>>> > > > >>> void processData(FetchResponse response) {
>>> > > > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {  //
>>> The
>>> > 99th
>>> > > > on
>>> > > > >>> this is 2-3 ms.
>>> > > > >>>     for (Message message : response) {
>>> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
>>> > > > >>>          enqueueOnNonBlockingRingBuffer(message);  // Never
>>> blocks,
>>> > > > >>> drops message if ring buffer is full.
>>> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
>>> > > > >>>          long timeOfEnqueue = geEnqueueTime(message);
>>> > > > >>>          long currentTime = System.currentTimeMillis();
>>> > > > >>>          long difference = currentTime - timeOfEnqueue;
>>> > > > >>>          lagHistogram.update(difference);
>>> > > > >>>        }
>>> > > > >>>      }
>>> > > > >>>    }
>>> > > > >>> }
>>> > > > >>>
>>> > > > >>> viii) So the kicker is that this lag is really really high:
>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I
>>> could
>>> > even
>>> > > > >>> discount some of it through NTP skew.
>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than
>>> > median
>>> > > > >>> kind of telling us how the distribution has a big tail.
>>> > > > >>> 99th Lag: *20 seconds*!!
>>> > > > >>>
>>> > > > >>> I can't quite figure out the source of the lag. Here are some
>>> other
>>> > > > >>> things of note:
>>> > > > >>>
>>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I see
>>> > > occasional
>>> > > > >>> spikes to more than 80% - 95%. This could probably be causing
>>> > issues.
>>> > > > I am
>>> > > > >>> wondering if this is down to the high number of partitions I
>>> have
>>> > and
>>> > > > how
>>> > > > >>> each partition ends up receiving not too many messages. 22k
>>> > messages
>>> > > > spread
>>> > > > >>> across 1024 partitions = only 21 odd messages/sec/partition.
>>> But
>>> > each
>>> > > > >>> broker should be receiving about  7500 messages/sec. It could
>>> also
>>> > be
>>> > > > down
>>> > > > >>> to the nature of these messages being tiny. I imagine that the
>>> > kafka
>>> > > > wire
>>> > > > >>> protocol is probably close to the 25 byte message payload. It
>>> also
>>> > > has
>>> > > > to
>>> > > > >>> do a few  CRC32 calculations etc. But again given that these
>>> are
>>> > > C3.2x
>>> > > > >>> machines and the number of messages is so tiny I'd expect it
>>> to do
>>> > > > better.
>>> > > > >>> But again reading this article
>>> > > > >>> <
>>> > > >
>>> > >
>>> >
>>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>>> > > >
>>> > > > about
>>> > > > >>> 2 million messages/sec across three brokers makes it seem like
>>> 22k
>>> > 25
>>> > > > byte
>>> > > > >>> messages should be very easy. So my biggest suspicion right
>>> now is
>>> > > > that the
>>> > > > >>> large number of partitions is some how responsible for this
>>> > latency.
>>> > > > >>>
>>> > > > >>> 2) On the consumer given that the 99th time between fetches ==
>>> 99th
>>> > > > time
>>> > > > >>> of a fetch, I don't see how I could do any better. It's pretty
>>> > close
>>> > > to
>>> > > > >>> just getting batches of data, iterating through them and
>>> dropping
>>> > > them
>>> > > > on
>>> > > > >>> the floor. My requests on the consumer are built like this:
>>> > > > >>>
>>> > > > >>>         FetchRequestBuilder builder = new
>>> > > > >>> FetchRequestBuilder().clientId(clientName)
>>> > > > >>>
>>> > > > >>>                 .maxWait(100).minBytes(1000);  // max wait of
>>> 100
>>> > ms
>>> > > or
>>> > > > >>> at least 1000 bytes whichever happens first.
>>> > > > >>>
>>> > > > >>>         // Add a fetch request for every partition.
>>> > > > >>>
>>> > > > >>>         for (PartitionMetadata partition : partitions) {
>>> > > > >>>
>>> > > > >>>             int partitionId = partition.partitionId();
>>> > > > >>>
>>> > > > >>>             long offset = readOffsets.get(partition);  // This
>>> map
>>> > > > >>> maintains the next partition to be read.
>>> > > > >>>
>>> > > > >>>             builder.addFetch(MY_TOPIC, partitionId, offset,
>>> 3000);
>>> > > //
>>> > > > >>> Up to 3000 bytes per fetch request.}
>>> > > > >>>         }
>>> > > > >>> 3) On the producer side I use the kafka beta producer. I've
>>> played
>>> > > > >>> around with many settings but none of them seem to have made a
>>> > > > difference.
>>> > > > >>> Here are my current settings:
>>> > > > >>>
>>> > > > >>> ProducerConfig values:
>>> > > > >>>
>>> > > > >>>        block.on.buffer.full = false
>>> > > > >>>
>>> > > > >>>         retry.backoff.ms = 100
>>> > > > >>>
>>> > > > >>>         buffer.memory = 83886080  // Kind of big - could it be
>>> > adding
>>> > > > >>> lag?
>>> > > > >>>
>>> > > > >>>         batch.size = 40500  // This is also kind of big -
>>> could it
>>> > be
>>> > > > >>> adding lag?
>>> > > > >>>
>>> > > > >>>         metrics.sample.window.ms = 30000
>>> > > > >>>
>>> > > > >>>         metadata.max.age.ms = 300000
>>> > > > >>>
>>> > > > >>>         receive.buffer.bytes = 32768
>>> > > > >>>
>>> > > > >>>         timeout.ms = 30000
>>> > > > >>>
>>> > > > >>>         max.in.flight.requests.per.connection = 5
>>> > > > >>>
>>> > > > >>>         metric.reporters = []
>>> > > > >>>
>>> > > > >>>         bootstrap.servers = [broker1, broker2, broker3]
>>> > > > >>>
>>> > > > >>>         client.id =
>>> > > > >>>
>>> > > > >>>         compression.type = none
>>> > > > >>>
>>> > > > >>>         retries = 0
>>> > > > >>>
>>> > > > >>>         max.request.size = 1048576
>>> > > > >>>
>>> > > > >>>         send.buffer.bytes = 131072
>>> > > > >>>
>>> > > > >>>         acks = 1
>>> > > > >>>
>>> > > > >>>         reconnect.backoff.ms = 10
>>> > > > >>>
>>> > > > >>>         linger.ms = 250  // Based on this config I'd imagine
>>> that
>>> > > the
>>> > > > >>> lag should be bounded to about 250 ms over all.
>>> > > > >>>
>>> > > > >>>         metrics.num.samples = 2
>>> > > > >>>
>>> > > > >>>         metadata.fetch.timeout.ms = 60000
>>> > > > >>>
>>> > > > >>> I know this is a lot of information. I just wanted to provide
>>> as
>>> > much
>>> > > > >>> context as possible.
>>> > > > >>>
>>> > > > >>> Thanks in advance!
>>> > > > >>>
>>> > > > >>>
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to