Hey Colin,

Thanks much for the comment. Please see me comment inline.

On Thu, Jan 26, 2017 at 10:15 AM, Colin McCabe <cmcc...@apache.org> wrote:

> On Wed, Jan 25, 2017, at 13:50, Dong Lin wrote:
> > Hey Colin,
> >
> > Good point! Yeah we have actually considered and tested this solution,
> > which we call one-broker-per-disk. It would work and should require no
> > major change in Kafka as compared to this JBOD KIP. So it would be a good
> > short term solution.
> >
> > But it has a few drawbacks which makes it less desirable in the long
> > term.
> > Assume we have 10 disks on a machine. Here are the problems:
>
> Hi Dong,
>
> Thanks for the thoughtful reply.
>
> >
> > 1) Our stress test result shows that one-broker-per-disk has 15% lower
> > throughput
> >
> > 2) Controller would need to send 10X as many LeaderAndIsrRequest,
> > MetadataUpdateRequest and StopReplicaRequest. This increases the burden
> > on
> > controller which can be the performance bottleneck.
>
> Maybe I'm misunderstanding something, but there would not be 10x as many
> StopReplicaRequest RPCs, would there?  The other requests would increase
> 10x, but from a pretty low base, right?  We are not reassigning
> partitions all the time, I hope (or else we have bigger problems...)
>

I think the controller will group StopReplicaRequest per broker and send
only one StopReplicaRequest to a broker during controlled shutdown. Anyway,
we don't have to worry about this if we agree that other requests will
increase by 10X. One MetadataRequest to send to each broker in the cluster
every time there is leadership change. I am not sure this is a real
problem. But in theory this makes the overhead complexity O(number of
broker) and may be a concern in the future. Ideally we should avoid it.


>
> >
> > 3) Less efficient use of physical resource on the machine. The number of
> > socket on each machine will increase by 10X. The number of connection
> > between any two machine will increase by 100X.
> >
> > 4) Less efficient way to management memory and quota.
> >
> > 5) Rebalance between disks/brokers on the same machine will less
> > efficient
> > and less flexible. Broker has to read data from another broker on the
> > same
> > machine via socket. It is also harder to do automatic load balance
> > between
> > disks on the same machine in the future.
> >
> > I will put this and the explanation in the rejected alternative section.
> > I
> > have a few questions:
> >
> > - Can you explain why this solution can help avoid scalability
> > bottleneck?
> > I actually think it will exacerbate the scalability problem due the 2)
> > above.
> > - Why can we push more RPC with this solution?
>
> To really answer this question we'd have to take a deep dive into the
> locking of the broker and figure out how effectively it can parallelize
> truly independent requests.  Almost every multithreaded process is going
> to have shared state, like shared queues or shared sockets, that is
> going to make scaling less than linear when you add disks or processors.
>  (And clearly, another option is to improve that scalability, rather
> than going multi-process!)
>

Yeah I also think it is better to improve scalability inside kafka code if
possible. I am not sure we currently have any scalability issue inside
Kafka that can not be removed without using multi-process.


>
> > - It is true that a garbage collection in one broker would not affect
> > others. But that is after every broker only uses 1/10 of the memory. Can
> > we be sure that this will actually help performance?
>
> The big question is, how much memory do Kafka brokers use now, and how
> much will they use in the future?  Our experience in HDFS was that once
> you start getting more than 100-200GB Java heap sizes, full GCs start
> taking minutes to finish when using the standard JVMs.  That alone is a
> good reason to go multi-process or consider storing more things off the
> Java heap.
>

I see. Now I agree one-broker-per-disk should be more efficient in terms of
GC since each broker probably needs less than 1/10 of the memory available
on a typical machine nowadays. I will remove this from the reason of
rejection.


>
> Disk failure is the "easy" case.  The "hard" case, which is
> unfortunately also the much more common case, is disk misbehavior.
> Towards the end of their lives, disks tend to start slowing down
> unpredictably.  Requests that would have completed immediately before
> start taking 20, 100 500 milliseconds.  Some files may be readable and
> other files may not be.  System calls hang, sometimes forever, and the
> Java process can't abort them, because the hang is in the kernel.  It is
> not fun when threads are stuck in "D state"
> http://stackoverflow.com/questions/20423521/process-perminan
> tly-stuck-on-d-state
> .  Even kill -9 cannot abort the thread then.  Fortunately, this is
> rare.
>

I agree it is a harder problem and it is rare. We probably don't have to
worry about it in this KIP since this issue is orthogonal to whether or not
we use JBOD.


>
> Another approach we should consider is for Kafka to implement its own
> storage layer that would stripe across multiple disks.  This wouldn't
> have to be done at the block level, but could be done at the file level.
>  We could use consistent hashing to determine which disks a file should
> end up on, for example.
>

Are you suggesting that we should distribute log, or log segment, across
disks of brokers? I am not sure if I fully understand this approach. My gut
feel is that this would be a drastic solution that would require
non-trivial design. While this may be useful to Kafka, I would prefer not
to discuss this in detail in this thread unless you believe it is strictly
superior to the design in this KIP in terms of solving our use-case.


> best,
> Colin
>
> >
> > Thanks,
> > Dong
> >
> > On Wed, Jan 25, 2017 at 11:34 AM, Colin McCabe <cmcc...@apache.org>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the writeup!  It's very interesting.
> > >
> > > I apologize in advance if this has been discussed somewhere else.  But
> I
> > > am curious if you have considered the solution of running multiple
> > > brokers per node.  Clearly there is a memory overhead with this
> solution
> > > because of the fixed cost of starting multiple JVMs.  However, running
> > > multiple JVMs would help avoid scalability bottlenecks.  You could
> > > probably push more RPCs per second, for example.  A garbage collection
> > > in one broker would not affect the others.  It would be interesting to
> > > see this considered in the "alternate designs" design, even if you end
> > > up deciding it's not the way to go.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Thu, Jan 12, 2017, at 10:46, Dong Lin wrote:
> > > > Hi all,
> > > >
> > > > We created KIP-112: Handle disk failure for JBOD. Please find the KIP
> > > > wiki
> > > > in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 112%3A+Handle+disk+failure+for+JBOD.
> > > >
> > > > This KIP is related to KIP-113
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 113%3A+Support+replicas+movement+between+log+directories>:
> > > > Support replicas movement between log directories. They are needed in
> > > > order
> > > > to support JBOD in Kafka. Please help review the KIP. You feedback is
> > > > appreciated!
> > > >
> > > > Thanks,
> > > > Dong
> > >
>

Reply via email to