a few extra points:

1. broker per disk might also incur more client <--> broker sockets:
suppose every producer / consumer "talks" to >1 partition, there's a very
good chance that partitions that were co-located on a single 10-disk broker
would now be split between several single-disk broker processes on the same
machine. hard to put a multiplier on this, but likely >x1. sockets are a
limited resource at the OS level and incur some memory cost (kernel buffers)

2. there's a memory overhead to spinning up a JVM (compiled code and byte
code objects etc). if we assume this overhead is ~300 MB (order of
magnitude, specifics vary) than spinning up 10 JVMs would lose you 3 GB of
RAM. not a ton, but non negligible.

3. there would also be some overhead downstream of kafka in any management
/ monitoring / log aggregation system. likely less than x10 though.

4. (related to above) - added complexity of administration with more
running instances.

is anyone running kafka with anywhere near 100GB heaps? i thought the point
was to rely on kernel page cache to do the disk buffering ....

On Thu, Jan 26, 2017 at 11:00 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Colin,
>
> Thanks much for the comment. Please see me comment inline.
>
> On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <cmcc...@apache.org> wrote:
>
> > On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
> > > Hey Colin,
> > >
> > > Good point! Yeah we have actually considered and tested this solution,
> > > which we call one-broker-per-disk. It would work and should require no
> > > major change in Kafka as compared to this JBOD KIP. So it would be a
> good
> > > short term solution.
> > >
> > > But it has a few drawbacks which makes it less desirable in the long
> > > term.
> > > Assume we have 10 disks on a machine. Here are the problems:
> >
> > Hi Dong,
> >
> > Thanks for the thoughtful reply.
> >
> > >
> > > 1) Our stress test result shows that one-broker-per-disk has 15% lower
> > > throughput
> > >
> > > 2) Controller would need to send 10X as many LeaderAndIsrRequest,
> > > MetadataUpdateRequest and StopReplicaRequest. This increases the burden
> > > on
> > > controller which can be the performance bottleneck.
> >
> > Maybe I'm misunderstanding something, but there would not be 10x as many
> > StopReplicaRequest RPCs, would there?  The other requests would increase
> > 10x, but from a pretty low base, right?  We are not reassigning
> > partitions all the time, I hope (or else we have bigger problems...)
> >
>
> I think the controller will group StopReplicaRequest per broker and send
> only one StopReplicaRequest to a broker during controlled shutdown. Anyway,
> we don't have to worry about this if we agree that other requests will
> increase by 10X. One MetadataRequest to send to each broker in the cluster
> every time there is leadership change. I am not sure this is a real
> problem. But in theory this makes the overhead complexity O(number of
> broker) and may be a concern in the future. Ideally we should avoid it.
>
>
> >
> > >
> > > 3) Less efficient use of physical resource on the machine. The number
> of
> > > socket on each machine will increase by 10X. The number of connection
> > > between any two machine will increase by 100X.
> > >
> > > 4) Less efficient way to management memory and quota.
> > >
> > > 5) Rebalance between disks/brokers on the same machine will less
> > > efficient
> > > and less flexible. Broker has to read data from another broker on the
> > > same
> > > machine via socket. It is also harder to do automatic load balance
> > > between
> > > disks on the same machine in the future.
> > >
> > > I will put this and the explanation in the rejected alternative
> section.
> > > I
> > > have a few questions:
> > >
> > > - Can you explain why this solution can help avoid scalability
> > > bottleneck?
> > > I actually think it will exacerbate the scalability problem due the 2)
> > > above.
> > > - Why can we push more RPC with this solution?
> >
> > To really answer this question we'd have to take a deep dive into the
> > locking of the broker and figure out how effectively it can parallelize
> > truly independent requests.  Almost every multithreaded process is going
> > to have shared state, like shared queues or shared sockets, that is
> > going to make scaling less than linear when you add disks or processors.
> >  (And clearly, another option is to improve that scalability, rather
> > than going multi-process!)
> >
>
> Yeah I also think it is better to improve scalability inside kafka code if
> possible. I am not sure we currently have any scalability issue inside
> Kafka that can not be removed without using multi-process.
>
>
> >
> > > - It is true that a garbage collection in one broker would not affect
> > > others. But that is after every broker only uses 1/10 of the memory.
> Can
> > > we be sure that this will actually help performance?
> >
> > The big question is, how much memory do Kafka brokers use now, and how
> > much will they use in the future?  Our experience in HDFS was that once
> > you start getting more than 100-200GB Java heap sizes, full GCs start
> > taking minutes to finish when using the standard JVMs.  That alone is a
> > good reason to go multi-process or consider storing more things off the
> > Java heap.
> >
>
> I see. Now I agree one-broker-per-disk should be more efficient in terms of
> GC since each broker probably needs less than 1/10 of the memory available
> on a typical machine nowadays. I will remove this from the reason of
> rejection.
>
>
> >
> > Disk failure is the "easy" case.  The "hard" case, which is
> > unfortunately also the much more common case, is disk misbehavior.
> > Towards the end of their lives, disks tend to start slowing down
> > unpredictably.  Requests that would have completed immediately before
> > start taking 20, 100 500 milliseconds.  Some files may be readable and
> > other files may not be.  System calls hang, sometimes forever, and the
> > Java process can't abort them, because the hang is in the kernel.  It is
> > not fun when threads are stuck in "D state"
> > http://stackoverflow.com/questions/20423521/process-perminan
> > tly-stuck-on-d-state
> > .  Even kill -9 cannot abort the thread then.  Fortunately, this is
> > rare.
> >
>
> I agree it is a harder problem and it is rare. We probably don't have to
> worry about it in this KIP since this issue is orthogonal to whether or not
> we use JBOD.
>
>
> >
> > Another approach we should consider is for Kafka to implement its own
> > storage layer that would stripe across multiple disks.  This wouldn't
> > have to be done at the block level, but could be done at the file level.
> >  We could use consistent hashing to determine which disks a file should
> > end up on, for example.
> >
>
> Are you suggesting that we should distribute log, or log segment, across
> disks of brokers? I am not sure if I fully understand this approach. My gut
> feel is that this would be a drastic solution that would require
> non-trivial design. While this may be useful to Kafka, I would prefer not
> to discuss this in detail in this thread unless you believe it is strictly
> superior to the design in this KIP in terms of solving our use-case.
>
>
> > best,
> > Colin
> >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the writeup!  It's very interesting.
> > > >
> > > > I apologize in advance if this has been discussed somewhere else.
> But
> > I
> > > > am curious if you have considered the solution of running multiple
> > > > brokers per node.  Clearly there is a memory overhead with this
> > solution
> > > > because of the fixed cost of starting multiple JVMs.  However,
> running
> > > > multiple JVMs would help avoid scalability bottlenecks.  You could
> > > > probably push more RPCs per second, for example.  A garbage
> collection
> > > > in one broker would not affect the others.  It would be interesting
> to
> > > > see this considered in the "alternate designs" design, even if you
> end
> > > > up deciding it's not the way to go.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
> > > > > Hi all,
> > > > >
> > > > > We created KIP-112: Handle disk failure for JBOD. Please find the
> KIP
> > > > > wiki
> > > > > in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 112%3A+Handle+disk+failure+for+JBOD.
> > > > >
> > > > > This KIP is related to KIP-113
> > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 113%3A+Support+replicas+movement+between+log+directories>:
> > > > > Support replicas movement between log directories. They are needed
> in
> > > > > order
> > > > > to support JBOD in Kafka. Please help review the KIP. You feedback
> is
> > > > > appreciated!
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > >
> >
>

Reply via email to