It's reasonable to make the behavior of random producers customizable
through a pluggable partitioner. So, if one doesn't care about # of socket
connections, one can choose to select a random partition on every send. If
one does have many producers, one can choose to periodically select a
random partition. To support this, the partitioner api needs to be changed
though.

Instead of
  def partition(key: T, numPartitions: Int): Int

we probably need something like the following:
  def partition(key: T, numPartitions: Int, availablePartitionList:
List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int

availablePartitionList: allows us to select only partitions that are
available.
isNewBatch: allows us to select the same partition for all messages in a
given batch in the async mode.
isRefreshMedatadata: allows us to implement the policy of switching to a
random partition periodically.

This will make the partitioner api a bit more complicated. However, it does
provide enough information for customization.

Thanks,

Jun



On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <crypt...@gmail.com> wrote:

> Sounds good, I will create a JIRA and upload a patch.
>
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop
> ********************************************/
>
>
> On Sep 17, 2013, at 1:19 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
>
> > I agree that minimizing the number of producer connections (while
> > being a good thing) is really required in very large production
> > deployments, and the net-effect of the existing change is
> > counter-intuitive to users who expect an immediate even distribution
> > across _all_ partitions of the topic.
> >
> > However, I don't think it is a hack because it is almost exactly the
> > same behavior as 0.7 in one of its modes. The 0.7 producer (which I
> > think was even more confusing) had three modes:
> > i) ZK send
> > ii) Config send(a): static list of broker1:port1,broker2:port2,etc.
> > iii) Config send(b): static list of a hardwareVIP:VIPport
> >
> > (i) and (ii) would achieve even distribution. (iii) would effectively
> > select one broker and distribute to partitions on that broker within
> > each reconnect interval. (iii) is very similar to what we now do in
> > 0.8. (Although we stick to one partition during each metadata refresh
> > interval that can be changed to stick to one broker and distribute
> > across partitions on that broker).
> >
> > At the same time, I agree with Joe's suggestion that we should keep
> > the more intuitive pre-KAFKA-1017 behavior as the default and move the
> > change in KAFKA-1017 to a more specific partitioner implementation.
> >
> > Joel
> >
> >
> > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >> Let me ask another question which I think is more objective. Let's say
> 100
> >> random, smart infrastructure specialists try Kafka, of these 100 how
> many
> >> do you believe will
> >> 1. Say that this behavior is what they expected to happen?
> >> 2. Be happy with this behavior?
> >> I am not being facetious I am genuinely looking for a numerical
> estimate. I
> >> am trying to figure out if nobody thought about this or if my estimate
> is
> >> just really different. For what it is worth my estimate is 0 and 5
> >> respectively.
> >>
> >> This would be fine expect that we changed it from the good behavior to
> the
> >> bad behavior to fix an issue that probably only we have.
> >>
> >> -Jay
> >>
> >>
> >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >>
> >>> I just took a look at this change. I agree with Joe, not to put to
> fine a
> >>> point on it, but this is a confusing hack.
> >>>
> >>> Jun, I don't think wanting to minimizing the number of TCP connections
> is
> >>> going to be a very common need for people with less than 10k
> producers. I
> >>> also don't think people are going to get very good load balancing out
> of
> >>> this because most people don't have a ton of producers. I think
> instead we
> >>> will spend the next year explaining this behavior which 99% of people
> will
> >>> think is a bug (because it is crazy, non-intuitive, and breaks their
> usage).
> >>>
> >>> Why was this done by adding special default behavior in the null key
> case
> >>> instead of as a partitioner? The argument that the partitioner
> interface
> >>> doesn't have sufficient information to choose a partition is not a good
> >>> argument for hacking in changes to the default, it is an argument for *
> >>> improving* the partitioner interface.
> >>>
> >>> The whole point of a partitioner interface is to make it possible to
> plug
> >>> in non-standard behavior like this, right?
> >>>
> >>> -Jay
> >>>
> >>>
> >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com> wrote:
> >>>
> >>>> Joe,
> >>>>
> >>>> Thanks for bringing this up. I want to clarify this a bit.
> >>>>
> >>>> 1. Currently, the producer side logic is that if the partitioning key
> is
> >>>> not provided (i.e., it is null), the partitioner won't be called. We
> did
> >>>> that because we want to select a random and "available" partition to
> send
> >>>> messages so that if some partitions are temporarily unavailable
> (because
> >>>> of
> >>>> broker failures), messages can still be sent to other partitions.
> Doing
> >>>> this in the partitioner is difficult since the partitioner doesn't
> know
> >>>> which partitions are currently available (the DefaultEventHandler
> does).
> >>>>
> >>>> 2. As Joel said, the common use case in production is that there are
> many
> >>>> more producers than #partitions in a topic. In this case, sticking to
> a
> >>>> partition for a few minutes is not going to cause too much imbalance
> in
> >>>> the
> >>>> partitions and has the benefit of reducing the # of socket
> connections. My
> >>>> feeling is that this will benefit most production users. In fact, if
> one
> >>>> uses a hardware load balancer for producing data in 0.7, it behaves in
> >>>> exactly the same way (a producer will stick to a broker until the
> >>>> reconnect
> >>>> interval is reached).
> >>>>
> >>>> 3. It is true that If one is testing a topic with more than one
> partition
> >>>> (which is not the default value), this behavior can be a bit weird.
> >>>> However, I think it can be mitigated by running multiple test producer
> >>>> instances.
> >>>>
> >>>> 4. Someone reported in the mailing list that all data shows in only
> one
> >>>> partition after a few weeks. This is clearly not the expected
> behavior. We
> >>>> can take a closer look to see if this is real issue.
> >>>>
> >>>> Do you think these address your concerns?
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <crypt...@gmail.com>
> wrote:
> >>>>
> >>>>> How about creating a new class called RandomRefreshPartioner and copy
> >>>> the
> >>>>> DefaultPartitioner code to it and then revert the DefaultPartitioner
> >>>> code.
> >>>>> I appreciate this is a one time burden for folks using the existing
> >>>>> 0.8-beta1 bumping into KAFKA-1017 in production having to switch to
> the
> >>>>> RandomRefreshPartioner and when folks deploy to production will have
> to
> >>>>> consider this property change.
> >>>>>
> >>>>> I make this suggestion keeping in mind the new folks that on board
> with
> >>>>> Kafka and when everyone is in development and testing mode for the
> first
> >>>>> time their experience would be as expected from how it would work in
> >>>>> production this way.  In dev/test when first using Kafka they won't
> >>>> have so
> >>>>> many producers for partitions but would look to parallelize their
> >>>> consumers
> >>>>> IMHO.
> >>>>>
> >>>>> The random broker change sounds like maybe a bigger change now this
> late
> >>>>> in the release cycle if we can accommodate folks trying Kafka for the
> >>>> first
> >>>>> time and through their development and testing along with full blown
> >>>>> production deploys.
> >>>>>
> >>>>> /*******************************************
> >>>>> Joe Stein
> >>>>> Founder, Principal Consultant
> >>>>> Big Data Open Source Security LLC
> >>>>> http://www.stealth.ly
> >>>>> Twitter: @allthingshadoop
> >>>>> ********************************************/
> >>>>>
> >>>>>
> >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks for bringing this up - it is definitely an important point
> to
> >>>>>>> discuss. The underlying issue of KAFKA-1017 was uncovered to some
> >>>>> degree by
> >>>>>>> the fact that in our deployment we did not significantly increase
> the
> >>>>> total
> >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we had say four
> >>>> partitions
> >>>>> per
> >>>>>>> broker, now we are using (say) eight partitions across the cluster.
> >>>> So
> >>>>> with
> >>>>>>> random partitioning every producer would end up connecting to
> nearly
> >>>>> every
> >>>>>>> broker (unlike 0.7 in which we would connect to only one broker
> >>>> within
> >>>>> each
> >>>>>>> reconnect interval). In a production-scale deployment that causes
> the
> >>>>> high
> >>>>>>> number of connections that KAFKA-1017 addresses.
> >>>>>>>
> >>>>>>> You are right that the fix of sticking to one partition over the
> >>>>> metadata
> >>>>>>> refresh interval goes against true consumer parallelism, but this
> >>>> would
> >>>>> be
> >>>>>>> the case only if there are few producers. If you have a sizable
> >>>> number
> >>>>> of
> >>>>>>> producers on average all partitions would get uniform volumes of
> >>>> data.
> >>>>>>>
> >>>>>>> One tweak to KAFKA-1017 that I think is reasonable would be instead
> >>>> of
> >>>>>>> sticking to a random partition, stick to a random broker and send
> to
> >>>>> random
> >>>>>>> partitions within that broker. This would make the behavior closer
> to
> >>>>> 0.7
> >>>>>>> wrt number of connections and random partitioning provided the
> >>>> number of
> >>>>>>> partitions per broker is high enough, which is why I mentioned the
> >>>>>>> partition count (in our usage) in 0.7 vs 0.8 above. Thoughts?
> >>>>>>>
> >>>>>>> Joel
> >>>>>>>
> >>>>>>>
> >>>>>>> On Friday, September 13, 2013, Joe Stein wrote:
> >>>>>>>>
> >>>>>>>> First, let me apologize for not realizing/noticing this until
> today.
> >>>>> One
> >>>>>>>> reason I left my last company was not being paid to work on Kafka
> >>>> nor
> >>>>>>> being
> >>>>>>> able to afford any time for a while to work on it. Now in my new
> gig
> >>>>> (just
> >>>>>>> wrapped up my first week, woo hoo) while I am still not "paid to
> >>>> work on
> >>>>>>> Kafka" I can afford some more time for it now and maybe in 6
> months I
> >>>>> will
> >>>>>>> be able to hire folks to work on Kafka (with more and more time for
> >>>>> myself
> >>>>>>> to work on it too) while we also work on client projects
> (especially
> >>>>> Kafka
> >>>>>>> based ones).
> >>>>>>>
> >>>>>>> So, I understand about the changes that were made to fix open file
> >>>>> handles
> >>>>>>> and make the random pinning be timed based (with a very large
> default
> >>>>>>> time).  Got all that.
> >>>>>>>
> >>>>>>> But, doesn't this completely negate what has been communicated to
> the
> >>>>>>> community for a very long time and the expectation they have? I
> >>>> think it
> >>>>>>> does.
> >>>>>>>
> >>>>>>> The expected functionality for random partitioning is that "This
> can
> >>>> be
> >>>>>>> done in a round-robin fashion simply to balance load" and that the
> >>>>>>> "producer" does it for you.
> >>>>>>>
> >>>>>>> Isn't a primary use case for partitions to paralyze consumers? If
> so
> >>>>> then
> >>>>>>> the expectation would be that all consumers would be getting in
> >>>> parallel
> >>>>>>> equally in a "round robin fashion" the data that was produced for
> the
> >>>>>>> topic... simply to balance load...with the producer handling it and
> >>>> with
> >>>>>>> the client application not having to-do anything. This randomness
> >>>>> occurring
> >>>>>>> every 10 minutes can't balance load.
> >>>>>>>
> >>>>>>> If users are going to work around this anyways (as I would honestly
> >>>> do
> >>>>> too)
> >>>>>>> doing a pseudo semantic random key and essentially forcing real
> >>>>> randomness
> >>>>>>> to simply balance load to my consumers running in parallel would we
> >>>>> still
> >>>>>>> end up hitting the KAFKA-1017 problem anyways? If not then why
> can't
> >>>> we
> >>>>>>> just give users the functionality and put back the 3 lines of code
> 1)
> >>>>>>> if(key == null) 2)  random.nextInt(numPartitions) 3) else ... If we
> >>>>> would
> >>>>>>> bump into KAFKA-1017 by working around it then we have not really
> >>>> solved
> >>>>>>> the root cause problem and removing expected functionality for a
> >>>> corner
> >>>>>>> case that might have other work arounds and/or code changes to
> solve
> >>>> it
> >>>>>>> another way or am I still not getting something?
> >>>>>>>
> >>>>>>> Also, I was looking at testRandomPartitioner in AsyncProducerTest
> >>>> and I
> >>>>>>> don't see how this would ever fail, the assertion is always for
> >>>>> partitionId
> >>>>>>> == 0 and it should be checking that data is going to different
> >>>>> partitions
> >>>>>>> for a topic, right?
> >>>>>>>
> >>>>>>> Let me know, I think this is an important discussion and even if it
> >>>>> ends up
> >>>>>>> as telling the community to only use one partition that is all you
> >>>> need
> >>>>> and
> >>>>>>> partitions become our super columns (Apache Cassandra joke, its
> >>>> funny)
> >>>>> then
> >>>>>>> we manage and support it and that is just how it is but if
> partitions
> >>>>> are a
> >>>>>>> good thing and having multiple consumers scale in parrelel for a
> >>>> single
> >>>>>>> topic also good then we have to manage and support that.
> >>>>>>>
> >>>>>>> /*******************************************
> >>>>>>> Joe Stein
> >>>>>>> Founder, Principal Consultant
> >>>>>>> Big Data Open Source Security LLC
> >>>>>>> http://www.stealth.ly
> >>>>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> >>>>>>> ********************************************/
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>>
>

Reply via email to