It's reasonable to make the behavior of random producers customizable through a pluggable partitioner. So, if one doesn't care about # of socket connections, one can choose to select a random partition on every send. If one does have many producers, one can choose to periodically select a random partition. To support this, the partitioner api needs to be changed though.
Instead of def partition(key: T, numPartitions: Int): Int we probably need something like the following: def partition(key: T, numPartitions: Int, availablePartitionList: List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int availablePartitionList: allows us to select only partitions that are available. isNewBatch: allows us to select the same partition for all messages in a given batch in the async mode. isRefreshMedatadata: allows us to implement the policy of switching to a random partition periodically. This will make the partitioner api a bit more complicated. However, it does provide enough information for customization. Thanks, Jun On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <crypt...@gmail.com> wrote: > Sounds good, I will create a JIRA and upload a patch. > > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop > ********************************************/ > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > I agree that minimizing the number of producer connections (while > > being a good thing) is really required in very large production > > deployments, and the net-effect of the existing change is > > counter-intuitive to users who expect an immediate even distribution > > across _all_ partitions of the topic. > > > > However, I don't think it is a hack because it is almost exactly the > > same behavior as 0.7 in one of its modes. The 0.7 producer (which I > > think was even more confusing) had three modes: > > i) ZK send > > ii) Config send(a): static list of broker1:port1,broker2:port2,etc. > > iii) Config send(b): static list of a hardwareVIP:VIPport > > > > (i) and (ii) would achieve even distribution. (iii) would effectively > > select one broker and distribute to partitions on that broker within > > each reconnect interval. (iii) is very similar to what we now do in > > 0.8. (Although we stick to one partition during each metadata refresh > > interval that can be changed to stick to one broker and distribute > > across partitions on that broker). > > > > At the same time, I agree with Joe's suggestion that we should keep > > the more intuitive pre-KAFKA-1017 behavior as the default and move the > > change in KAFKA-1017 to a more specific partitioner implementation. > > > > Joel > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> Let me ask another question which I think is more objective. Let's say > 100 > >> random, smart infrastructure specialists try Kafka, of these 100 how > many > >> do you believe will > >> 1. Say that this behavior is what they expected to happen? > >> 2. Be happy with this behavior? > >> I am not being facetious I am genuinely looking for a numerical > estimate. I > >> am trying to figure out if nobody thought about this or if my estimate > is > >> just really different. For what it is worth my estimate is 0 and 5 > >> respectively. > >> > >> This would be fine expect that we changed it from the good behavior to > the > >> bad behavior to fix an issue that probably only we have. > >> > >> -Jay > >> > >> > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> > >>> I just took a look at this change. I agree with Joe, not to put to > fine a > >>> point on it, but this is a confusing hack. > >>> > >>> Jun, I don't think wanting to minimizing the number of TCP connections > is > >>> going to be a very common need for people with less than 10k > producers. I > >>> also don't think people are going to get very good load balancing out > of > >>> this because most people don't have a ton of producers. I think > instead we > >>> will spend the next year explaining this behavior which 99% of people > will > >>> think is a bug (because it is crazy, non-intuitive, and breaks their > usage). > >>> > >>> Why was this done by adding special default behavior in the null key > case > >>> instead of as a partitioner? The argument that the partitioner > interface > >>> doesn't have sufficient information to choose a partition is not a good > >>> argument for hacking in changes to the default, it is an argument for * > >>> improving* the partitioner interface. > >>> > >>> The whole point of a partitioner interface is to make it possible to > plug > >>> in non-standard behavior like this, right? > >>> > >>> -Jay > >>> > >>> > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com> wrote: > >>> > >>>> Joe, > >>>> > >>>> Thanks for bringing this up. I want to clarify this a bit. > >>>> > >>>> 1. Currently, the producer side logic is that if the partitioning key > is > >>>> not provided (i.e., it is null), the partitioner won't be called. We > did > >>>> that because we want to select a random and "available" partition to > send > >>>> messages so that if some partitions are temporarily unavailable > (because > >>>> of > >>>> broker failures), messages can still be sent to other partitions. > Doing > >>>> this in the partitioner is difficult since the partitioner doesn't > know > >>>> which partitions are currently available (the DefaultEventHandler > does). > >>>> > >>>> 2. As Joel said, the common use case in production is that there are > many > >>>> more producers than #partitions in a topic. In this case, sticking to > a > >>>> partition for a few minutes is not going to cause too much imbalance > in > >>>> the > >>>> partitions and has the benefit of reducing the # of socket > connections. My > >>>> feeling is that this will benefit most production users. In fact, if > one > >>>> uses a hardware load balancer for producing data in 0.7, it behaves in > >>>> exactly the same way (a producer will stick to a broker until the > >>>> reconnect > >>>> interval is reached). > >>>> > >>>> 3. It is true that If one is testing a topic with more than one > partition > >>>> (which is not the default value), this behavior can be a bit weird. > >>>> However, I think it can be mitigated by running multiple test producer > >>>> instances. > >>>> > >>>> 4. Someone reported in the mailing list that all data shows in only > one > >>>> partition after a few weeks. This is clearly not the expected > behavior. We > >>>> can take a closer look to see if this is real issue. > >>>> > >>>> Do you think these address your concerns? > >>>> > >>>> Thanks, > >>>> > >>>> Jun > >>>> > >>>> > >>>> > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <crypt...@gmail.com> > wrote: > >>>> > >>>>> How about creating a new class called RandomRefreshPartioner and copy > >>>> the > >>>>> DefaultPartitioner code to it and then revert the DefaultPartitioner > >>>> code. > >>>>> I appreciate this is a one time burden for folks using the existing > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production having to switch to > the > >>>>> RandomRefreshPartioner and when folks deploy to production will have > to > >>>>> consider this property change. > >>>>> > >>>>> I make this suggestion keeping in mind the new folks that on board > with > >>>>> Kafka and when everyone is in development and testing mode for the > first > >>>>> time their experience would be as expected from how it would work in > >>>>> production this way. In dev/test when first using Kafka they won't > >>>> have so > >>>>> many producers for partitions but would look to parallelize their > >>>> consumers > >>>>> IMHO. > >>>>> > >>>>> The random broker change sounds like maybe a bigger change now this > late > >>>>> in the release cycle if we can accommodate folks trying Kafka for the > >>>> first > >>>>> time and through their development and testing along with full blown > >>>>> production deploys. > >>>>> > >>>>> /******************************************* > >>>>> Joe Stein > >>>>> Founder, Principal Consultant > >>>>> Big Data Open Source Security LLC > >>>>> http://www.stealth.ly > >>>>> Twitter: @allthingshadoop > >>>>> ********************************************/ > >>>>> > >>>>> > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > >>>>> > >>>>>>> > >>>>>>> > >>>>>>> Thanks for bringing this up - it is definitely an important point > to > >>>>>>> discuss. The underlying issue of KAFKA-1017 was uncovered to some > >>>>> degree by > >>>>>>> the fact that in our deployment we did not significantly increase > the > >>>>> total > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we had say four > >>>> partitions > >>>>> per > >>>>>>> broker, now we are using (say) eight partitions across the cluster. > >>>> So > >>>>> with > >>>>>>> random partitioning every producer would end up connecting to > nearly > >>>>> every > >>>>>>> broker (unlike 0.7 in which we would connect to only one broker > >>>> within > >>>>> each > >>>>>>> reconnect interval). In a production-scale deployment that causes > the > >>>>> high > >>>>>>> number of connections that KAFKA-1017 addresses. > >>>>>>> > >>>>>>> You are right that the fix of sticking to one partition over the > >>>>> metadata > >>>>>>> refresh interval goes against true consumer parallelism, but this > >>>> would > >>>>> be > >>>>>>> the case only if there are few producers. If you have a sizable > >>>> number > >>>>> of > >>>>>>> producers on average all partitions would get uniform volumes of > >>>> data. > >>>>>>> > >>>>>>> One tweak to KAFKA-1017 that I think is reasonable would be instead > >>>> of > >>>>>>> sticking to a random partition, stick to a random broker and send > to > >>>>> random > >>>>>>> partitions within that broker. This would make the behavior closer > to > >>>>> 0.7 > >>>>>>> wrt number of connections and random partitioning provided the > >>>> number of > >>>>>>> partitions per broker is high enough, which is why I mentioned the > >>>>>>> partition count (in our usage) in 0.7 vs 0.8 above. Thoughts? > >>>>>>> > >>>>>>> Joel > >>>>>>> > >>>>>>> > >>>>>>> On Friday, September 13, 2013, Joe Stein wrote: > >>>>>>>> > >>>>>>>> First, let me apologize for not realizing/noticing this until > today. > >>>>> One > >>>>>>>> reason I left my last company was not being paid to work on Kafka > >>>> nor > >>>>>>> being > >>>>>>> able to afford any time for a while to work on it. Now in my new > gig > >>>>> (just > >>>>>>> wrapped up my first week, woo hoo) while I am still not "paid to > >>>> work on > >>>>>>> Kafka" I can afford some more time for it now and maybe in 6 > months I > >>>>> will > >>>>>>> be able to hire folks to work on Kafka (with more and more time for > >>>>> myself > >>>>>>> to work on it too) while we also work on client projects > (especially > >>>>> Kafka > >>>>>>> based ones). > >>>>>>> > >>>>>>> So, I understand about the changes that were made to fix open file > >>>>> handles > >>>>>>> and make the random pinning be timed based (with a very large > default > >>>>>>> time). Got all that. > >>>>>>> > >>>>>>> But, doesn't this completely negate what has been communicated to > the > >>>>>>> community for a very long time and the expectation they have? I > >>>> think it > >>>>>>> does. > >>>>>>> > >>>>>>> The expected functionality for random partitioning is that "This > can > >>>> be > >>>>>>> done in a round-robin fashion simply to balance load" and that the > >>>>>>> "producer" does it for you. > >>>>>>> > >>>>>>> Isn't a primary use case for partitions to paralyze consumers? If > so > >>>>> then > >>>>>>> the expectation would be that all consumers would be getting in > >>>> parallel > >>>>>>> equally in a "round robin fashion" the data that was produced for > the > >>>>>>> topic... simply to balance load...with the producer handling it and > >>>> with > >>>>>>> the client application not having to-do anything. This randomness > >>>>> occurring > >>>>>>> every 10 minutes can't balance load. > >>>>>>> > >>>>>>> If users are going to work around this anyways (as I would honestly > >>>> do > >>>>> too) > >>>>>>> doing a pseudo semantic random key and essentially forcing real > >>>>> randomness > >>>>>>> to simply balance load to my consumers running in parallel would we > >>>>> still > >>>>>>> end up hitting the KAFKA-1017 problem anyways? If not then why > can't > >>>> we > >>>>>>> just give users the functionality and put back the 3 lines of code > 1) > >>>>>>> if(key == null) 2) random.nextInt(numPartitions) 3) else ... If we > >>>>> would > >>>>>>> bump into KAFKA-1017 by working around it then we have not really > >>>> solved > >>>>>>> the root cause problem and removing expected functionality for a > >>>> corner > >>>>>>> case that might have other work arounds and/or code changes to > solve > >>>> it > >>>>>>> another way or am I still not getting something? > >>>>>>> > >>>>>>> Also, I was looking at testRandomPartitioner in AsyncProducerTest > >>>> and I > >>>>>>> don't see how this would ever fail, the assertion is always for > >>>>> partitionId > >>>>>>> == 0 and it should be checking that data is going to different > >>>>> partitions > >>>>>>> for a topic, right? > >>>>>>> > >>>>>>> Let me know, I think this is an important discussion and even if it > >>>>> ends up > >>>>>>> as telling the community to only use one partition that is all you > >>>> need > >>>>> and > >>>>>>> partitions become our super columns (Apache Cassandra joke, its > >>>> funny) > >>>>> then > >>>>>>> we manage and support it and that is just how it is but if > partitions > >>>>> are a > >>>>>>> good thing and having multiple consumers scale in parrelel for a > >>>> single > >>>>>>> topic also good then we have to manage and support that. > >>>>>>> > >>>>>>> /******************************************* > >>>>>>> Joe Stein > >>>>>>> Founder, Principal Consultant > >>>>>>> Big Data Open Source Security LLC > >>>>>>> http://www.stealth.ly > >>>>>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > >>>>>>> ********************************************/ > >>>>>>> > >>>>> > >>>> > >>> > >>> >