My broker logs say acks = 1 unless this is some other ack:

Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 858424;
ClientId: ; *RequiredAcks: 1*; AckTimeoutMs: 30000 ms; TopicAndPartition:

I'll try to get a stand alone test going, and try to narrow down the
problem. Thanks for helping!

On Tue, Dec 30, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Yes, the fetch request blocks until there is data for you, so you should
> see high remote time there.
>
> For the producer you should only see remote time if you have acks=-1. This
> represents the time it takes the message to replicate to other in-sync
> replicas.
>
> I agree if there are no slow requests on the broker during an interval
> where you saw a long pause, this points to something happening on the
> client side. It might be worth enabling debug or trace logging on the
> client to see what is happening when the pauses occur.
>
> Alternately if you can find a reproducible test case we can turn into a
> JIRA someone else may be willing to dive in.
>
> -Jay
>
> On Tue, Dec 30, 2014 at 4:37 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Got it. I had to enable to see the logs. Here are two files with the
> > timings I got from the period I had logging enabled:
> > Producer Requests :
> >
> >
> https://docs.google.com/spreadsheets/d/1spVFWsf7T8ZmwM0JMYnkBoyYufRhHav50sueFA9ztP8/edit?usp=sharing
> >
> > Fetch Requests:
> >
> >
> https://docs.google.com/spreadsheets/d/1PMGslvlttNQOd1uFQDkgPOY31nSK9TZYFy7oa75QaWA/edit?usp=sharing
> >
> > Things I noted:
> > 1) The producer request totalTime barely goes above 100 ms and when it
> > does, it is always due to a high remoteTime. Again given this is barely
> > ever above 100ms this doesn't look that bad. Maybe I need to keep it on
> for
> > longer to get more samples. The overall time on the producer does seem a
> > bit on the high side though even though it definitely doesn't account for
> > the 20-30 seconds end to end time I saw.
> >
> > 2) The fetch request totalTime does frequently go to a 100 ms odd. It is
> > also always due to a high remoteTime. The remoteTime is in the ballpark
> of
> > 100ms always. I specify a maxWait of 100ms on the client when making my
> > fetch requests, so it might be this 100ms that I am seeing. My guess is
> > that the buffer sizes I specified don't get filled all the time, so the
> > maxWait timeout is hit and we end up waiting a 100 ms on the broker.
> Again
> > I don't see how the end to end time could be so poor.
> >
> > From these timings (maybe I don't have enough samples) it seems like the
> > problem is not in the brokers. Does that seem like a legitimate
> conclusion?
> > I might have to measure for a longer period, to get the tail. I am
> planning
> > on extracting the producer and consumer code to get a self-contained load
> > test going. I'll do the same end to end lag measurements and see if it's
> my
> > environment that is adding this lag somehow.
> >
> > Thanks!
> >
> >
> > On Tue, Dec 30, 2014 at 11:58 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > The produce to the local log is the time taken to write to the local
> > > filesystem. If you see that spike up then that is the culprit.
> > >
> > > What I was referring to is the line in kafka-request.log that looks
> > > something like:
> > > "Completed request:%s from client
> > >
> > >
> >
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
> > > This breaks down the total request time and how much was spent on local
> > I/O
> > > etc. If totalTime is always small the server is not to blame. If it
> > spikes,
> > > then the question is where is that time going and this will answer
> that.
> > >
> > > You actually raise a good point about that other log message. It says
> "4
> > > bytes written to log xyz" but what it is actually logging is the number
> > of
> > > messages not the number of bytes, so that is quite misleading and a
> bug.
> > >
> > > -Jay
> > >
> > > On Mon, Dec 29, 2014 at 6:37 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Thanks Jay.
> > > >
> > > > I just used JMX to change the log level on the broker and checked the
> > > logs.
> > > > I am still not sure what line is exactly telling me how much time a
> > > > producer took to process a request. I see log lines of this format:
> > > >
> > > > 2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] Produce to
> local
> > > log
> > > > in 5 ms
> > > >
> > > > Is this what I should be looking out for? I see producer requests of
> > the
> > > > form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0
> > > ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] Handling
> request:
> > > > Name: ProducerRequest; Version: 0; CorrelationId: 36308;  more stuff"
> > > but I
> > > > can't quite tell when this request was done with. So right now I see
> > that
> > > > requests are logged frequently on the brokers (multiple times per
> > second)
> > > > but since I don't know when they are finished with I can't tell the
> > total
> > > > time.
> > > >
> > > > Some other things that I found kind of odd are:
> > > >
> > > > 1) The producer requests seem to have an ack timeout of 30000 ms. I
> > don't
> > > > think I set this on the producer. I don't know if this could have
> > > anything
> > > > to do with the latency problem.
> > > >
> > > > 2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] Handling
> request:
> > > > Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ;
> > > > RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff.
> > > >
> > > > 2) I see a bunch of small writes written to my [topic, partition]
> logs.
> > > My
> > > > messages are at a minimum 27 bytes, so maybe this is something else.
> > > Again
> > > > don't know if this is a problem:
> > > >
> > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written
> > to
> > > > log MY.TOPIC-400 beginning at offset 19221923 and ending at offset
> > > 19221923
> > > >
> > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] 4 bytes written
> > to
> > > > log MY.TOPIC-208 beginning at offset 29438019 and ending at offset
> > > 29438022
> > > >
> > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written
> > to
> > > > log MY.TOPIC-163 beginning at offset 14821977 and ending at offset
> > > 14821977
> > > >
> > > > 2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written
> > to
> > > > log MY.TOPIC118 beginning at offset 19321510 and ending at offset
> > > 19321510
> > > >
> > > > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
> > > > [kafka.server.KafkaApis              ]: [KafkaApi-11] 3 bytes written
> > to
> > > > log MY.TOPIC-463 beginning at offset 28777627 and ending at offset
> > > 28777629
> > > >
> > > > On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > Hey Rajiv,
> > > > >
> > > > > Yes, if you uncomment the line
> > > > >   #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
> > > > > in the example log4j.properties file that will enable logging of
> each
> > > > > request including the time taken processing the request. This is
> the
> > > > first
> > > > > step for diagnosing latency spikes since this will rule out the
> > server.
> > > > You
> > > > > may also want to enable DEBUG or TRACE on the producer and see what
> > is
> > > > > happening when those spikes occur.
> > > > >
> > > > > The JMX stats show that at least the producer is not exhausting the
> > I/O
> > > > > thread (it is 92% idle) and doesn't seem to be waiting on memory
> > which
> > > is
> > > > > somewhat surprising to me (the NaN is an artifact of how we compute
> > > that
> > > > > stat--no allocations took place in the time period measured so it
> is
> > > kind
> > > > > of 0/0).
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian <
> ra...@signalfuse.com>
> > > > > wrote:
> > > > >
> > > > > > In case the attachments don't work out here is an imgur link -
> > > > > > http://imgur.com/NslGpT3,Uw6HFow#0
> > > > > >
> > > > > > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <
> > ra...@signalfuse.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Never mind about (2). I see these stats are already being
> output
> > by
> > > > the
> > > > > > > kafka producer. I've attached a couple of screenshots (couldn't
> > > copy
> > > > > > paste
> > > > > > > from jvisualvm ). Do any of these things strike as odd? The
> > > > > > > bufferpool-wait-ratio sadly shows up as a NaN.
> > > > > > >
> > > > > > > I still don't know how to figure out (1).
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <
> > > ra...@signalfuse.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Jay,
> > > > > > >>
> > > > > > >> Re (1) - I am not sure how to do this? Actually I am not sure
> > what
> > > > > this
> > > > > > >> means. Is this the time every write/fetch request is received
> on
> > > the
> > > > > > >> broker? Do I need to enable some specific log level for this
> to
> > > show
> > > > > > up? It
> > > > > > >> doesn't show up in the usual log. Is this information also
> > > available
> > > > > via
> > > > > > >> jmx somehow?
> > > > > > >> Re (2) - Are you saying that I should instrument the
> "percentage
> > > of
> > > > > time
> > > > > > >> waiting for buffer space" stat myself? If so how do I do this.
> > Or
> > > > are
> > > > > > these
> > > > > > >> stats already output to jmx by the kafka producer code. This
> > seems
> > > > > like
> > > > > > >> it's in the internals of the kafka producer client code.
> > > > > > >>
> > > > > > >> Thanks again!
> > > > > > >>
> > > > > > >>
> > > > > > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <
> > > > ra...@signalfuse.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Thanks Jay. Will check (1) and (2) and get back to you. The
> > test
> > > is
> > > > > not
> > > > > > >>> stand-alone now. It might be a bit of work to extract it to a
> > > > > > stand-alone
> > > > > > >>> executable. It might take me a bit of time to get that going.
> > > > > > >>>
> > > > > > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io
> >
> > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hey Rajiv,
> > > > > > >>>>
> > > > > > >>>> This sounds like a bug. The more info you can help us get
> the
> > > > easier
> > > > > > to
> > > > > > >>>> fix. Things that would help:
> > > > > > >>>> 1. Can you check if the the request log on the servers shows
> > > > latency
> > > > > > >>>> spikes
> > > > > > >>>> (in which case it is a server problem)?
> > > > > > >>>> 2. It would be worth also getting the jmx stats on the
> > producer
> > > as
> > > > > > they
> > > > > > >>>> will show things like what percentage of time it is waiting
> > for
> > > > > buffer
> > > > > > >>>> space etc.
> > > > > > >>>>
> > > > > > >>>> If your test is reasonably stand-alone it would be great to
> > > file a
> > > > > > JIRA
> > > > > > >>>> and
> > > > > > >>>> attach the test code and the findings you already have so
> > > someone
> > > > > can
> > > > > > >>>> dig
> > > > > > >>>> into what is going on.
> > > > > > >>>>
> > > > > > >>>> -Jay
> > > > > > >>>>
> > > > > > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <
> > > > ra...@signalfuse.com
> > > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>> > Hi all,
> > > > > > >>>> >
> > > > > > >>>> > Bumping this up, in case some one has any ideas. I did yet
> > > > another
> > > > > > >>>> > experiment where I create 4 producers and stripe the send
> > > > requests
> > > > > > >>>> across
> > > > > > >>>> > them in a manner such that any one producer only sees 256
> > > > > partitions
> > > > > > >>>> > instead of the entire 1024. This seems to have helped a
> bit,
> > > and
> > > > > > >>>> though I
> > > > > > >>>> > still see crazy high 99th (25-30 seconds), the median,
> mean,
> > > > 75th
> > > > > > and
> > > > > > >>>> 95th
> > > > > > >>>> > percentile have all gone down.
> > > > > > >>>> >
> > > > > > >>>> > Thanks!
> > > > > > >>>> >
> > > > > > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <
> > > > > > >>>> tstump...@ntent.com>
> > > > > > >>>> > wrote:
> > > > > > >>>> >
> > > > > > >>>> > > Ah I thought it was restarting the broker that made
> things
> > > > > better
> > > > > > :)
> > > > > > >>>> > >
> > > > > > >>>> > > Yeah I have no experience with the Java client so can't
> > > really
> > > > > > help
> > > > > > >>>> > there.
> > > > > > >>>> > >
> > > > > > >>>> > > Good luck!
> > > > > > >>>> > >
> > > > > > >>>> > > -----Original Message-----
> > > > > > >>>> > > From: Rajiv Kurian [ra...@signalfuse.com]
> > > > > > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM
> > > > > > >>>> > > To: users@kafka.apache.org [users@kafka.apache.org]
> > > > > > >>>> > > Subject: Re: Trying to figure out kafka latency issues
> > > > > > >>>> > >
> > > > > > >>>> > > I'll take a look at the GC profile of the brokers Right
> > now
> > > I
> > > > > keep
> > > > > > >>>> a tab
> > > > > > >>>> > on
> > > > > > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory
> (on
> > > the
> > > > > > >>>> machine
> > > > > > >>>> > not
> > > > > > >>>> > > JVM heap) free disk space on the broker. I'll need to
> > take a
> > > > > look
> > > > > > >>>> at the
> > > > > > >>>> > > JVM metrics too. What seemed strange is that going from
> 8
> > ->
> > > > 512
> > > > > > >>>> > partitions
> > > > > > >>>> > > increases the latency, but going fro 512-> 8 does not
> > > decrease
> > > > > it.
> > > > > > >>>> I have
> > > > > > >>>> > > to restart the producer (but not the broker) for the end
> > to
> > > > end
> > > > > > >>>> latency
> > > > > > >>>> > to
> > > > > > >>>> > > go down That made it seem  that the fault was probably
> > with
> > > > the
> > > > > > >>>> producer
> > > > > > >>>> > > and not the broker. Only restarting the producer made
> > things
> > > > > > >>>> better. I'll
> > > > > > >>>> > > do more extensive measurement on the broker.
> > > > > > >>>> > >
> > > > > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <
> > > > > > >>>> tstump...@ntent.com>
> > > > > > >>>> > > wrote:
> > > > > > >>>> > > >
> > > > > > >>>> > > > Did you see my response and have you checked the
> server
> > > logs
> > > > > > >>>> especially
> > > > > > >>>> > > > the GC logs? It still sounds like you are running out
> of
> > > > > memory
> > > > > > >>>> on the
> > > > > > >>>> > > > broker. What is your max heap memory and are you
> > thrashing
> > > > > once
> > > > > > >>>> you
> > > > > > >>>> > start
> > > > > > >>>> > > > writing to all those partitions?
> > > > > > >>>> > > >
> > > > > > >>>> > > > You have measured very thoroughly from an external
> point
> > > of
> > > > > > view,
> > > > > > >>>> i
> > > > > > >>>> > think
> > > > > > >>>> > > > now you'll have to start measuring the internal
> metrics.
> > > > Maybe
> > > > > > >>>> someone
> > > > > > >>>> > > else
> > > > > > >>>> > > > will have ideas on what jmx values to watch.
> > > > > > >>>> > > >
> > > > > > >>>> > > > Best,
> > > > > > >>>> > > > Thunder
> > > > > > >>>> > > >
> > > > > > >>>> > > >
> > > > > > >>>> > > > -----Original Message-----
> > > > > > >>>> > > > From: Rajiv Kurian [ra...@signalfuse.com]
> > > > > > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM
> > > > > > >>>> > > > To: users@kafka.apache.org [users@kafka.apache.org]
> > > > > > >>>> > > > Subject: Re: Trying to figure out kafka latency issues
> > > > > > >>>> > > >
> > > > > > >>>> > > > Some more work tells me that the end to end latency
> > > numbers
> > > > > vary
> > > > > > >>>> with
> > > > > > >>>> > the
> > > > > > >>>> > > > number of partitions I am writing to. I did an
> > experiment,
> > > > > where
> > > > > > >>>> based
> > > > > > >>>> > > on a
> > > > > > >>>> > > > run time flag I would dynamically select how many of
> the
> > > > *1024
> > > > > > >>>> > > partitions*
> > > > > > >>>> > > > I write to. So say I decide I'll write to at most 256
> > > > > partitions
> > > > > > >>>> I mod
> > > > > > >>>> > > > whatever partition I would actually write to by 256.
> > > > Basically
> > > > > > the
> > > > > > >>>> > number
> > > > > > >>>> > > > of partitions for this topic on the broker remains the
> > > same
> > > > at
> > > > > > >>>> *1024*
> > > > > > >>>> > > > partitions but the number of partitions my producers
> > write
> > > > to
> > > > > > >>>> changes
> > > > > > >>>> > > > dynamically based on a run time flag. So something
> like
> > > > this:
> > > > > > >>>> > > >
> > > > > > >>>> > > > int partition = getPartitionForMessage(message);
> > > > > > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();
> >  //
> > > > > This
> > > > > > >>>> flag
> > > > > > >>>> > can
> > > > > > >>>> > > be
> > > > > > >>>> > > > updated without bringing the application down - just a
> > > > > volatile
> > > > > > >>>> read of
> > > > > > >>>> > > > some number set externally.
> > > > > > >>>> > > > int moddedPartition = partition %
> maxPartitionsToWrite.
> > > > > > >>>> > > > // Send a message to this Kafka partition.
> > > > > > >>>> > > >
> > > > > > >>>> > > > Here are some interesting things I've noticed:
> > > > > > >>>> > > >
> > > > > > >>>> > > > i) When I start my client and it *never writes* to
> more
> > > than
> > > > > *8
> > > > > > >>>> > > > partitions *(same
> > > > > > >>>> > > > data rate but fewer partitions) - the end to end *99th
> > > > latency
> > > > > > is
> > > > > > >>>> > 300-350
> > > > > > >>>> > > > ms*. Quite a bit of this (numbers in my previous
> emails)
> > > is
> > > > > the
> > > > > > >>>> latency
> > > > > > >>>> > > > from producer -> broker and the latency from broker ->
> > > > > consumer.
> > > > > > >>>> Still
> > > > > > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
> > > > > > >>>> > > >
> > > > > > >>>> > > > ii) When I increase the maximum number of partitions,
> > end
> > > to
> > > > > end
> > > > > > >>>> > latency
> > > > > > >>>> > > > increases dramatically. At *256 partitions* the end to
> > end
> > > > > *99th
> > > > > > >>>> > latency
> > > > > > >>>> > > is
> > > > > > >>>> > > > still 390 - 418 ms.* Worse than the latency figures
> for
> > *8
> > > > > > >>>> *partitions,
> > > > > > >>>> > > but
> > > > > > >>>> > > > not by much. When I increase this number to *512
> > > partitions
> > > > > *the
> > > > > > >>>> end
> > > > > > >>>> > > > to end *99th
> > > > > > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At
> > *1024*
> > > > > > >>>> partitions
> > > > > > >>>> > the
> > > > > > >>>> > > > *99th
> > > > > > >>>> > > > latency is at 25 - 30 seconds*.
> > > > > > >>>> > > > A table of the numbers:
> > > > > > >>>> > > >
> > > > > > >>>> > > > Max number of partitions written to (out of 1024)
> > > > > > >>>> > > >
> > > > > > >>>> > > > End to end latency
> > > > > > >>>> > > >
> > > > > > >>>> > > > 8
> > > > > > >>>> > > >
> > > > > > >>>> > > > 300 - 350 ms
> > > > > > >>>> > > >
> > > > > > >>>> > > > 256
> > > > > > >>>> > > >
> > > > > > >>>> > > > 390 - 418 ms
> > > > > > >>>> > > >
> > > > > > >>>> > > > 512
> > > > > > >>>> > > >
> > > > > > >>>> > > > 19 - 24 seconds
> > > > > > >>>> > > >
> > > > > > >>>> > > > 1024
> > > > > > >>>> > > >
> > > > > > >>>> > > > 25 - 30 seconds
> > > > > > >>>> > > >
> > > > > > >>>> > > >
> > > > > > >>>> > > > iii) Once I make the max number of partitions high
> > enough,
> > > > > > >>>> reducing it
> > > > > > >>>> > > > doesn't help. For eg: If I go up from *8* to *512
> > > > *partitions,
> > > > > > the
> > > > > > >>>> > > latency
> > > > > > >>>> > > > goes up. But while the producer is running if I go
> down
> > > from
> > > > > > >>>> *512* to
> > > > > > >>>> > > > *8 *partitions,
> > > > > > >>>> > > > it doesn't reduce the latency numbers. My guess is
> that
> > > the
> > > > > > >>>> producer is
> > > > > > >>>> > > > creating some state lazily per partition and this
> state
> > is
> > > > > > >>>> causing the
> > > > > > >>>> > > > latency. Once this state is created, writing to fewer
> > > > > partitions
> > > > > > >>>> > doesn't
> > > > > > >>>> > > > seem to help. Only a restart of the producer calms
> > things
> > > > > down.
> > > > > > >>>> > > >
> > > > > > >>>> > > > So my current plan is to reduce the number of
> partitions
> > > on
> > > > > the
> > > > > > >>>> topic,
> > > > > > >>>> > > but
> > > > > > >>>> > > > there seems to be something deeper going on for the
> > > latency
> > > > > > >>>> numbers to
> > > > > > >>>> > be
> > > > > > >>>> > > > so poor to begin with and then suffer so much more
> (non
> > > > > > linearly)
> > > > > > >>>> with
> > > > > > >>>> > > > additional partitions.
> > > > > > >>>> > > >
> > > > > > >>>> > > > Thanks!
> > > > > > >>>> > > >
> > > > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <
> > > > > > >>>> ra...@signalfuse.com>
> > > > > > >>>> > > > wrote:
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > I've done some more measurements. I've also started
> > > > > measuring
> > > > > > >>>> the
> > > > > > >>>> > > latency
> > > > > > >>>> > > > > between when I ask my producer to send a message and
> > > when
> > > > I
> > > > > > get
> > > > > > >>>> an
> > > > > > >>>> > > > > acknowledgement via the callback. Here is my code:
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > // This function is called on every producer once
> > every
> > > 30
> > > > > > >>>> seconds.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > public void addLagMarkers(final Histogram
> enqueueLag)
> > {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         final int numberOfPartitions = 1024;
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         final long timeOfEnqueue =
> > > > > System.currentTimeMillis();
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         final Callback callback = new Callback() {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             @Override
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             public void onCompletion(RecordMetadata
> > > > > metadata,
> > > > > > >>>> > Exception
> > > > > > >>>> > > > ex)
> > > > > > >>>> > > > > {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 if (metadata != null) {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                     // The difference between ack
> time
> > > > from
> > > > > > >>>> broker
> > > > > > >>>> > and
> > > > > > >>>> > > > > enqueue time.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                     final long timeOfAck =
> > > > > > >>>> > System.currentTimeMillis();
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                     final long lag = timeOfAck -
> > > > > > timeOfEnqueue;
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                     enqueueLag.update(lag);
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 }
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             }
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         };
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         for (int i = 0; i < numberOfPartitions;
> i++) {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             try {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 byte[] value =
> > > > > > >>>> LagMarker.serialize(timeOfEnqueue);
> > > > > > >>>> > //
> > > > > > >>>> > > 10
> > > > > > >>>> > > > > bytes -> short version + long timestamp.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 // This message is later used by the
> > > > > consumers
> > > > > > >>>> to
> > > > > > >>>> > > measure
> > > > > > >>>> > > > > lag.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 ProducerRecord record = new
> > > > > > >>>> ProducerRecord(MY_TOPIC,
> > > > > > >>>> > i,
> > > > > > >>>> > > > > null, value);
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 kafkaProducer.send(record,
> callback);
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             } catch (Exception e) {
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >                 // We just dropped a lag marker.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >             }
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >         }
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >     }
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's
> > not
> > > > > > >>>> stellar, but
> > > > > > >>>> > > > > doesn't account for the *20-30 second 99th* I see on
> > the
> > > > end
> > > > > > to
> > > > > > >>>> end
> > > > > > >>>> > > lag.
> > > > > > >>>> > > > > I am consuming in a tight loop on the Consumers
> (using
> > > the
> > > > > > >>>> > > > SimpleConsumer)
> > > > > > >>>> > > > > with minimal processing with a *99th fetch time *of
> > > > > *130-140*
> > > > > > >>>> ms, so
> > > > > > >>>> > I
> > > > > > >>>> > > > > don't think that should be a problem either.
> > Completely
> > > > > > baffled.
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > Thanks!
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >
> > > > > > >>>> > > > >
> > > > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
> > > > > > >>>> ra...@signalfuse.com>
> > > > > > >>>> > > > > wrote:
> > > > > > >>>> > > > >>
> > > > > > >>>> > > > >>
> > > > > > >>>> > > > >>
> > > > > > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
> > > > > > >>>> ra...@signalfuse.com
> > > > > > >>>> > >
> > > > > > >>>> > > > >> wrote:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> I am trying to replace a Thrift peer to peer API
> > with
> > > > > kafka
> > > > > > >>>> for a
> > > > > > >>>> > > > >>> particular work flow. I am finding the 99th
> > percentile
> > > > > > >>>> latency to
> > > > > > >>>> > be
> > > > > > >>>> > > > >>> unacceptable at this time. This entire work load
> > runs
> > > in
> > > > > an
> > > > > > >>>> Amazon
> > > > > > >>>> > > > VPC. I'd
> > > > > > >>>> > > > >>> greatly appreciate it if some one has any insights
> > on
> > > > why
> > > > > I
> > > > > > am
> > > > > > >>>> > seeing
> > > > > > >>>> > > > such
> > > > > > >>>> > > > >>> poor numbers. Here are some details and
> measurements
> > > > > taken:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> i) I have a single topic with 1024 partitions
> that I
> > > am
> > > > > > >>>> writing to
> > > > > > >>>> > > from
> > > > > > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka
> > producer.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on
> > ec2.
> > > > > Each
> > > > > > of
> > > > > > >>>> > those
> > > > > > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 *
> 80
> > > GB
> > > > > > SSDs.
> > > > > > >>>> The
> > > > > > >>>> > > > broker ->
> > > > > > >>>> > > > >>> partitions mapping was decided by Kafka when I
> > created
> > > > the
> > > > > > >>>> topic.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> iii) I write about 22 thousand messages per second
> > > from
> > > > > > >>>> across the
> > > > > > >>>> > 6
> > > > > > >>>> > > > >>> clients. This number was calculated using
> > distributed
> > > > > > >>>> counters. I
> > > > > > >>>> > > just
> > > > > > >>>> > > > >>> increment a distributed counter in the callback
> from
> > > my
> > > > > > >>>> enqueue job
> > > > > > >>>> > > if
> > > > > > >>>> > > > the
> > > > > > >>>> > > > >>> metadata returned is not null. I also increment a
> > > > number o
> > > > > > >>>> dropped
> > > > > > >>>> > > > messages
> > > > > > >>>> > > > >>> counter if the callback has a non-null exception
> or
> > if
> > > > > there
> > > > > > >>>> was an
> > > > > > >>>> > > > >>> exception in the synchronous send call. The number
> > of
> > > > > > dropped
> > > > > > >>>> > > messages
> > > > > > >>>> > > > is
> > > > > > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3
> are
> > > > > > >>>> responsible for
> > > > > > >>>> > > > 95% of
> > > > > > >>>> > > > >>> the traffic. Messages are very tiny and have null
> > keys
> > > > > and
> > > > > > >>>> 27 byte
> > > > > > >>>> > > > values
> > > > > > >>>> > > > >>> (2 byte version and 25 byte payload). Again these
> > > > messages
> > > > > > are
> > > > > > >>>> > > written
> > > > > > >>>> > > > >>> using the kafka 0.8.2 client.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> iv) I have 3 consumer processes consuming only
> from
> > > this
> > > > > > >>>> topic.
> > > > > > >>>> > Each
> > > > > > >>>> > > > >>> consumer process is assigned a disjoint set of the
> > > 1024
> > > > > > >>>> partitions
> > > > > > >>>> > by
> > > > > > >>>> > > > an
> > > > > > >>>> > > > >>> eternal arbiter. Each consumer process then
> creates
> > a
> > > > > > mapping
> > > > > > >>>> from
> > > > > > >>>> > > > brokers
> > > > > > >>>> > > > >>> -> partitions it has been assigned. It then starts
> > one
> > > > > > fetcher
> > > > > > >>>> > thread
> > > > > > >>>> > > > per
> > > > > > >>>> > > > >>> broker. Each thread queries the broker (using the
> > > > > > >>>> SimpleConsumer)
> > > > > > >>>> > it
> > > > > > >>>> > > > has
> > > > > > >>>> > > > >>> been assigned for partitions such that partitions
> =
> > > > > > >>>> (partitions on
> > > > > > >>>> > > > broker)
> > > > > > >>>> > > > >>> *∩ *(partitions assigned to the process by the
> > > arbiter).
> > > > > So
> > > > > > in
> > > > > > >>>> > effect
> > > > > > >>>> > > > >>> there are 9 consumer threads across 3 consumer
> > > processes
> > > > > > that
> > > > > > >>>> > query a
> > > > > > >>>> > > > >>> disjoint set of partitions for the single topic
> and
> > > > > amongst
> > > > > > >>>> > > themselves
> > > > > > >>>> > > > they
> > > > > > >>>> > > > >>> manage to consume from every topic. So each thread
> > > has a
> > > > > run
> > > > > > >>>> loop
> > > > > > >>>> > > > where it
> > > > > > >>>> > > > >>> asks for messages, consumes them then asks for
> more
> > > > > > messages.
> > > > > > >>>> When
> > > > > > >>>> > a
> > > > > > >>>> > > > run
> > > > > > >>>> > > > >>> loop starts, it starts by querying for the latest
> > > > message
> > > > > > in a
> > > > > > >>>> > > > partition
> > > > > > >>>> > > > >>> (i.e. discards any previous backup) and then
> > > maintains a
> > > > > map
> > > > > > >>>> of
> > > > > > >>>> > > > partition
> > > > > > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that
> > it
> > > > > > consumes
> > > > > > >>>> > > > messages in
> > > > > > >>>> > > > >>> order.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >> Edit: Mean't external arbiter.
> > > > > > >>>> > > > >>
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> v) Consumption is really simple. Each message is
> put
> > > on
> > > > a
> > > > > > non
> > > > > > >>>> > > blocking
> > > > > > >>>> > > > >>> efficient ring buffer. If the ring buffer is full
> > the
> > > > > > message
> > > > > > >>>> is
> > > > > > >>>> > > > dropped. I
> > > > > > >>>> > > > >>> measure the mean time between fetches and it is
> the
> > > same
> > > > > as
> > > > > > >>>> the
> > > > > > >>>> > time
> > > > > > >>>> > > > for
> > > > > > >>>> > > > >>> fetches up to a ms, meaning no matter how many
> > > messages
> > > > > are
> > > > > > >>>> > dequeued,
> > > > > > >>>> > > > it
> > > > > > >>>> > > > >>> takes almost no time to process them. At the end
> of
> > > > > > >>>> processing a
> > > > > > >>>> > > fetch
> > > > > > >>>> > > > >>> request I increment another distributed counter
> that
> > > > > counts
> > > > > > >>>> the
> > > > > > >>>> > > number
> > > > > > >>>> > > > of
> > > > > > >>>> > > > >>> messages processed. This counter tells me that on
> > > > average
> > > > > I
> > > > > > am
> > > > > > >>>> > > > consuming
> > > > > > >>>> > > > >>> the same number of messages/sec that I enqueue on
> > the
> > > > > > >>>> producers
> > > > > > >>>> > i.e.
> > > > > > >>>> > > > around
> > > > > > >>>> > > > >>> 22 thousand messages/sec.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> vi) The 99th percentile of the number of messages
> > > > fetched
> > > > > > per
> > > > > > >>>> fetch
> > > > > > >>>> > > > >>> request is about 500 messages.  The 99th
> percentile
> > of
> > > > the
> > > > > > >>>> time it
> > > > > > >>>> > > > takes to
> > > > > > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played
> around
> > > with
> > > > > the
> > > > > > >>>> buffer
> > > > > > >>>> > > and
> > > > > > >>>> > > > >>> maxWait settings on the SimpleConsumer and
> > attempting
> > > to
> > > > > > >>>> consume
> > > > > > >>>> > more
> > > > > > >>>> > > > >>> messages was leading the 99th percentile of the
> > fetch
> > > > time
> > > > > > to
> > > > > > >>>> > balloon
> > > > > > >>>> > > > up,
> > > > > > >>>> > > > >>> so I am consuming in smaller batches right now.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> vii) Every 30 seconds odd each of the producers
> > > inserts
> > > > a
> > > > > > >>>> trace
> > > > > > >>>> > > message
> > > > > > >>>> > > > >>> into each partition (so 1024 messages per producer
> > > every
> > > > > 30
> > > > > > >>>> > seconds).
> > > > > > >>>> > > > Each
> > > > > > >>>> > > > >>> message only contains the
> System.currentTimeMillis()
> > > at
> > > > > the
> > > > > > >>>> time of
> > > > > > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer
> > in
> > > > step
> > > > > > (v)
> > > > > > >>>> > always
> > > > > > >>>> > > > >>> checks to see if a message it dequeued is of this
> > > trace
> > > > > type
> > > > > > >>>> > (instead
> > > > > > >>>> > > > of
> > > > > > >>>> > > > >>> the regular message type). This is a simple check
> on
> > > the
> > > > > > >>>> first 2
> > > > > > >>>> > byte
> > > > > > >>>> > > > >>> version that each message buffer contains. If it
> is
> > a
> > > > > trace
> > > > > > >>>> message
> > > > > > >>>> > > it
> > > > > > >>>> > > > >>> reads it's payload as a long and uses the
> difference
> > > > > between
> > > > > > >>>> the
> > > > > > >>>> > > system
> > > > > > >>>> > > > >>> time on the consumer and this payload and updates
> a
> > > > > > histogram
> > > > > > >>>> with
> > > > > > >>>> > > this
> > > > > > >>>> > > > >>> difference value. Ignoring NTP skew (which I've
> > > measured
> > > > > to
> > > > > > >>>> be in
> > > > > > >>>> > the
> > > > > > >>>> > > > order
> > > > > > >>>> > > > >>> of milliseconds) this is the lag between when a
> > > message
> > > > > was
> > > > > > >>>> > enqueued
> > > > > > >>>> > > > on the
> > > > > > >>>> > > > >>> producer and when it was read on the consumer.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte
> > version
> > > +
> > > > 8
> > > > > > >>>> byte long
> > > > > > >>>> > > for
> > > > > > >>>> > > > >> timestamp.
> > > > > > >>>> > > > >>
> > > > > > >>>> > > > >> So pseudocode for every consumer thread (one per
> > broker
> > > > per
> > > > > > >>>> consumer
> > > > > > >>>> > > > >>> process (9 in total across 3 consumers) is:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> void run() {
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> while (running) {
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
> > > > > > >>>> > > > >>>
> > partitionsAssignedToThisProcessThatFallOnThisBroker);
> > > > //
> > > > > > This
> > > > > > >>>> > > > >>> assignment is done by an external arbiter.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>     measureTimeBetweenFetchRequests();   // The
> 99th
> > > of
> > > > > this
> > > > > > >>>> is
> > > > > > >>>> > > > 130-140ms
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>     FetchResponse fetchResponse =
> > > > > > >>>> fetchResponse(fetchRequest);  //
> > > > > > >>>> > > The
> > > > > > >>>> > > > >>> 99th of this is 130-140ms, which is the same as
> the
> > > time
> > > > > > >>>> between
> > > > > > >>>> > > > fetches.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>     processData(fetchResponse);  // The 99th on
> this
> > > is
> > > > > 2-3
> > > > > > >>>> ms.
> > > > > > >>>> > > > >>>   }
> > > > > > >>>> > > > >>> }
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> void processData(FetchResponse response) {
> > > > > > >>>> > > > >>>   try (Timer.Context _ =
> > > processWorkerDataTimer.time() {
> > > > > //
> > > > > > >>>> The
> > > > > > >>>> > 99th
> > > > > > >>>> > > > on
> > > > > > >>>> > > > >>> this is 2-3 ms.
> > > > > > >>>> > > > >>>     for (Message message : response) {
> > > > > > >>>> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
> > > > > > >>>> > > > >>>          enqueueOnNonBlockingRingBuffer(message);
> > //
> > > > > Never
> > > > > > >>>> blocks,
> > > > > > >>>> > > > >>> drops message if ring buffer is full.
> > > > > > >>>> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
> > > > > > >>>> > > > >>>          long timeOfEnqueue =
> > geEnqueueTime(message);
> > > > > > >>>> > > > >>>          long currentTime =
> > > System.currentTimeMillis();
> > > > > > >>>> > > > >>>          long difference = currentTime -
> > > timeOfEnqueue;
> > > > > > >>>> > > > >>>          lagHistogram.update(difference);
> > > > > > >>>> > > > >>>        }
> > > > > > >>>> > > > >>>      }
> > > > > > >>>> > > > >>>    }
> > > > > > >>>> > > > >>> }
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> viii) So the kicker is that this lag is really
> > really
> > > > > high:
> > > > > > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty
> high
> > > > and I
> > > > > > >>>> could
> > > > > > >>>> > even
> > > > > > >>>> > > > >>> discount some of it through NTP skew.
> > > > > > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is
> > > > bigger
> > > > > > than
> > > > > > >>>> > median
> > > > > > >>>> > > > >>> kind of telling us how the distribution has a big
> > > tail.
> > > > > > >>>> > > > >>> 99th Lag: *20 seconds*!!
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> I can't quite figure out the source of the lag.
> Here
> > > are
> > > > > > some
> > > > > > >>>> other
> > > > > > >>>> > > > >>> things of note:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU
> > but
> > > I
> > > > > see
> > > > > > >>>> > > occasional
> > > > > > >>>> > > > >>> spikes to more than 80% - 95%. This could probably
> > be
> > > > > > causing
> > > > > > >>>> > issues.
> > > > > > >>>> > > > I am
> > > > > > >>>> > > > >>> wondering if this is down to the high number of
> > > > > partitions I
> > > > > > >>>> have
> > > > > > >>>> > and
> > > > > > >>>> > > > how
> > > > > > >>>> > > > >>> each partition ends up receiving not too many
> > > messages.
> > > > > 22k
> > > > > > >>>> > messages
> > > > > > >>>> > > > spread
> > > > > > >>>> > > > >>> across 1024 partitions = only 21 odd
> > > > > messages/sec/partition.
> > > > > > >>>> But
> > > > > > >>>> > each
> > > > > > >>>> > > > >>> broker should be receiving about  7500
> messages/sec.
> > > It
> > > > > > could
> > > > > > >>>> also
> > > > > > >>>> > be
> > > > > > >>>> > > > down
> > > > > > >>>> > > > >>> to the nature of these messages being tiny. I
> > imagine
> > > > that
> > > > > > the
> > > > > > >>>> > kafka
> > > > > > >>>> > > > wire
> > > > > > >>>> > > > >>> protocol is probably close to the 25 byte message
> > > > payload.
> > > > > > It
> > > > > > >>>> also
> > > > > > >>>> > > has
> > > > > > >>>> > > > to
> > > > > > >>>> > > > >>> do a few  CRC32 calculations etc. But again given
> > that
> > > > > these
> > > > > > >>>> are
> > > > > > >>>> > > C3.2x
> > > > > > >>>> > > > >>> machines and the number of messages is so tiny I'd
> > > > expect
> > > > > it
> > > > > > >>>> to do
> > > > > > >>>> > > > better.
> > > > > > >>>> > > > >>> But again reading this article
> > > > > > >>>> > > > >>> <
> > > > > > >>>> > > >
> > > > > > >>>> > >
> > > > > > >>>> >
> > > > > > >>>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > > > > > >>>> > > >
> > > > > > >>>> > > > about
> > > > > > >>>> > > > >>> 2 million messages/sec across three brokers makes
> it
> > > > seem
> > > > > > >>>> like 22k
> > > > > > >>>> > 25
> > > > > > >>>> > > > byte
> > > > > > >>>> > > > >>> messages should be very easy. So my biggest
> > suspicion
> > > > > right
> > > > > > >>>> now is
> > > > > > >>>> > > > that the
> > > > > > >>>> > > > >>> large number of partitions is some how responsible
> > for
> > > > > this
> > > > > > >>>> > latency.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> 2) On the consumer given that the 99th time
> between
> > > > > fetches
> > > > > > >>>> == 99th
> > > > > > >>>> > > > time
> > > > > > >>>> > > > >>> of a fetch, I don't see how I could do any better.
> > > It's
> > > > > > pretty
> > > > > > >>>> > close
> > > > > > >>>> > > to
> > > > > > >>>> > > > >>> just getting batches of data, iterating through
> them
> > > and
> > > > > > >>>> dropping
> > > > > > >>>> > > them
> > > > > > >>>> > > > on
> > > > > > >>>> > > > >>> the floor. My requests on the consumer are built
> > like
> > > > > this:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         FetchRequestBuilder builder = new
> > > > > > >>>> > > > >>> FetchRequestBuilder().clientId(clientName)
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>                 .maxWait(100).minBytes(1000);  //
> > max
> > > > wait
> > > > > > of
> > > > > > >>>> 100
> > > > > > >>>> > ms
> > > > > > >>>> > > or
> > > > > > >>>> > > > >>> at least 1000 bytes whichever happens first.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         // Add a fetch request for every
> partition.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         for (PartitionMetadata partition :
> > > partitions) {
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>             int partitionId =
> > partition.partitionId();
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>             long offset =
> > readOffsets.get(partition);
> > > > //
> > > > > > >>>> This map
> > > > > > >>>> > > > >>> maintains the next partition to be read.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>             builder.addFetch(MY_TOPIC,
> partitionId,
> > > > > offset,
> > > > > > >>>> 3000);
> > > > > > >>>> > > //
> > > > > > >>>> > > > >>> Up to 3000 bytes per fetch request.}
> > > > > > >>>> > > > >>>         }
> > > > > > >>>> > > > >>> 3) On the producer side I use the kafka beta
> > producer.
> > > > > I've
> > > > > > >>>> played
> > > > > > >>>> > > > >>> around with many settings but none of them seem to
> > > have
> > > > > > made a
> > > > > > >>>> > > > difference.
> > > > > > >>>> > > > >>> Here are my current settings:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> ProducerConfig values:
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>        block.on.buffer.full = false
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         retry.backoff.ms = 100
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         buffer.memory = 83886080  // Kind of big -
> > > could
> > > > > it
> > > > > > be
> > > > > > >>>> > adding
> > > > > > >>>> > > > >>> lag?
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         batch.size = 40500  // This is also kind
> of
> > > big
> > > > -
> > > > > > >>>> could it
> > > > > > >>>> > be
> > > > > > >>>> > > > >>> adding lag?
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         metrics.sample.window.ms = 30000
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         metadata.max.age.ms = 300000
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         receive.buffer.bytes = 32768
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         timeout.ms = 30000
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         max.in.flight.requests.per.connection = 5
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         metric.reporters = []
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         bootstrap.servers = [broker1, broker2,
> > > broker3]
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         client.id =
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         compression.type = none
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         retries = 0
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         max.request.size = 1048576
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         send.buffer.bytes = 131072
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         acks = 1
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         reconnect.backoff.ms = 10
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         linger.ms = 250  // Based on this config
> > I'd
> > > > > > imagine
> > > > > > >>>> that
> > > > > > >>>> > > the
> > > > > > >>>> > > > >>> lag should be bounded to about 250 ms over all.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         metrics.num.samples = 2
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>         metadata.fetch.timeout.ms = 60000
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> I know this is a lot of information. I just wanted
> > to
> > > > > > provide
> > > > > > >>>> as
> > > > > > >>>> > much
> > > > > > >>>> > > > >>> context as possible.
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>> Thanks in advance!
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > > >>>
> > > > > > >>>> > > >
> > > > > > >>>> > >
> > > > > > >>>> >
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to