However, currently, if key is null, the partitioner is not even called. Do you want to change DefaultEventHandler too?
This also doesn't allow the partitioner to select a random and available partition, which in my opinion is more important than making partitions perfectly evenly balanced. Thanks, Jun On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <crypt...@gmail.com> wrote: > What I was proposing was two fold > > 1) revert the DefaultPartitioner class > > then > > 2) create a new partitioner that folks could use (like at LinkedIn you > would use this partitioner instead) in ProducerConfig > > private class RandomRefreshTimPartitioner[T](props: VerifiableProperties = > null) extends Partitioner[T] { > private val random = new java.util.Random > > def partition(key: T, numPartitions: Int): Int = { > Utils.abs(key.hashCode) % numPartitions > } > } > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ********************************************/ > > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <jun...@gmail.com> wrote: > > > Joe, > > > > Not sure I fully understand your propose. Do you want to put the random > > partitioning selection logic (for messages without a key) in the > > partitioner without changing the partitioner api? That's difficult. The > > issue is that in the current partitioner api, we don't know which > > partitions are available. For example, if we have replication factor 1 > on a > > topic and a broker is down, the best thing to do for the random > partitioner > > is to select an available partition at random (assuming more than 1 > > partition is created for the topic). > > > > Another option is to revert the logic in the random partitioning > selection > > logic in DefaultEventHandler to select a random partition per batch of > > events (instead of sticking with a random partition for some configured > > amount of time). This is doable, but I am not sure if it's that critical. > > Since this is one of the two possible behaviors in 0.7, it's hard to say > > whether people will be surprised by that. Preserving both behaviors in > 0.7 > > will require changing the partitioner api. This is more work and I agree > > it's better to do this post 0.8.0 final. > > > > Thanks, > > > > Jun > > > > > > > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <crypt...@gmail.com> wrote: > > > > > Jun, can we hold this extra change over for 0.8.1 and just go with > > > reverting where we were before for the default with a new partition for > > > meta refresh and support both? > > > > > > I am not sure I entirely understand why someone would need the extra > > > functionality you are talking about which sounds cool though... adding > it > > > to the API (especially now) without people using it may just make folks > > ask > > > more questions and maybe not use it ... IDK ... but in any case we can > > work > > > on buttoning up 0.8 and shipping just the change for two partitioners > > > https://issues.apache.org/jira/browse/KAFKA-1067 and circling back if > we > > > wanted on this extra item (including the discussion) to 0.8.1 or > greater? > > > I am always of the mind of reduce complexity unless that complexity is > > in > > > fact better than not having it. > > > > > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > It's reasonable to make the behavior of random producers customizable > > > > through a pluggable partitioner. So, if one doesn't care about # of > > > socket > > > > connections, one can choose to select a random partition on every > send. > > > If > > > > one does have many producers, one can choose to periodically select a > > > > random partition. To support this, the partitioner api needs to be > > > changed > > > > though. > > > > > > > > Instead of > > > > def partition(key: T, numPartitions: Int): Int > > > > > > > > we probably need something like the following: > > > > def partition(key: T, numPartitions: Int, availablePartitionList: > > > > List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int > > > > > > > > availablePartitionList: allows us to select only partitions that are > > > > available. > > > > isNewBatch: allows us to select the same partition for all messages > in > > a > > > > given batch in the async mode. > > > > isRefreshMedatadata: allows us to implement the policy of switching > to > > a > > > > random partition periodically. > > > > > > > > This will make the partitioner api a bit more complicated. However, > it > > > does > > > > provide enough information for customization. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > > > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <crypt...@gmail.com> > wrote: > > > > > > > > > Sounds good, I will create a JIRA and upload a patch. > > > > > > > > > > > > > > > /******************************************* > > > > > Joe Stein > > > > > Founder, Principal Consultant > > > > > Big Data Open Source Security LLC > > > > > http://www.stealth.ly > > > > > Twitter: @allthingshadoop > > > > > ********************************************/ > > > > > > > > > > > > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > > > > > > > I agree that minimizing the number of producer connections (while > > > > > > being a good thing) is really required in very large production > > > > > > deployments, and the net-effect of the existing change is > > > > > > counter-intuitive to users who expect an immediate even > > distribution > > > > > > across _all_ partitions of the topic. > > > > > > > > > > > > However, I don't think it is a hack because it is almost exactly > > the > > > > > > same behavior as 0.7 in one of its modes. The 0.7 producer > (which I > > > > > > think was even more confusing) had three modes: > > > > > > i) ZK send > > > > > > ii) Config send(a): static list of > broker1:port1,broker2:port2,etc. > > > > > > iii) Config send(b): static list of a hardwareVIP:VIPport > > > > > > > > > > > > (i) and (ii) would achieve even distribution. (iii) would > > effectively > > > > > > select one broker and distribute to partitions on that broker > > within > > > > > > each reconnect interval. (iii) is very similar to what we now do > in > > > > > > 0.8. (Although we stick to one partition during each metadata > > refresh > > > > > > interval that can be changed to stick to one broker and > distribute > > > > > > across partitions on that broker). > > > > > > > > > > > > At the same time, I agree with Joe's suggestion that we should > keep > > > > > > the more intuitive pre-KAFKA-1017 behavior as the default and > move > > > the > > > > > > change in KAFKA-1017 to a more specific partitioner > implementation. > > > > > > > > > > > > Joel > > > > > > > > > > > > > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <jay.kr...@gmail.com> > > > > wrote: > > > > > >> Let me ask another question which I think is more objective. > Let's > > > say > > > > > 100 > > > > > >> random, smart infrastructure specialists try Kafka, of these 100 > > how > > > > > many > > > > > >> do you believe will > > > > > >> 1. Say that this behavior is what they expected to happen? > > > > > >> 2. Be happy with this behavior? > > > > > >> I am not being facetious I am genuinely looking for a numerical > > > > > estimate. I > > > > > >> am trying to figure out if nobody thought about this or if my > > > estimate > > > > > is > > > > > >> just really different. For what it is worth my estimate is 0 > and 5 > > > > > >> respectively. > > > > > >> > > > > > >> This would be fine expect that we changed it from the good > > behavior > > > to > > > > > the > > > > > >> bad behavior to fix an issue that probably only we have. > > > > > >> > > > > > >> -Jay > > > > > >> > > > > > >> > > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com > > > > > > wrote: > > > > > >> > > > > > >>> I just took a look at this change. I agree with Joe, not to put > > to > > > > > fine a > > > > > >>> point on it, but this is a confusing hack. > > > > > >>> > > > > > >>> Jun, I don't think wanting to minimizing the number of TCP > > > > connections > > > > > is > > > > > >>> going to be a very common need for people with less than 10k > > > > > producers. I > > > > > >>> also don't think people are going to get very good load > balancing > > > out > > > > > of > > > > > >>> this because most people don't have a ton of producers. I think > > > > > instead we > > > > > >>> will spend the next year explaining this behavior which 99% of > > > people > > > > > will > > > > > >>> think is a bug (because it is crazy, non-intuitive, and breaks > > > their > > > > > usage). > > > > > >>> > > > > > >>> Why was this done by adding special default behavior in the > null > > > key > > > > > case > > > > > >>> instead of as a partitioner? The argument that the partitioner > > > > > interface > > > > > >>> doesn't have sufficient information to choose a partition is > not > > a > > > > good > > > > > >>> argument for hacking in changes to the default, it is an > argument > > > > for * > > > > > >>> improving* the partitioner interface. > > > > > >>> > > > > > >>> The whole point of a partitioner interface is to make it > possible > > > to > > > > > plug > > > > > >>> in non-standard behavior like this, right? > > > > > >>> > > > > > >>> -Jay > > > > > >>> > > > > > >>> > > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com> > > wrote: > > > > > >>> > > > > > >>>> Joe, > > > > > >>>> > > > > > >>>> Thanks for bringing this up. I want to clarify this a bit. > > > > > >>>> > > > > > >>>> 1. Currently, the producer side logic is that if the > > partitioning > > > > key > > > > > is > > > > > >>>> not provided (i.e., it is null), the partitioner won't be > > called. > > > We > > > > > did > > > > > >>>> that because we want to select a random and "available" > > partition > > > to > > > > > send > > > > > >>>> messages so that if some partitions are temporarily > unavailable > > > > > (because > > > > > >>>> of > > > > > >>>> broker failures), messages can still be sent to other > > partitions. > > > > > Doing > > > > > >>>> this in the partitioner is difficult since the partitioner > > doesn't > > > > > know > > > > > >>>> which partitions are currently available (the > > DefaultEventHandler > > > > > does). > > > > > >>>> > > > > > >>>> 2. As Joel said, the common use case in production is that > there > > > are > > > > > many > > > > > >>>> more producers than #partitions in a topic. In this case, > > sticking > > > > to > > > > > a > > > > > >>>> partition for a few minutes is not going to cause too much > > > imbalance > > > > > in > > > > > >>>> the > > > > > >>>> partitions and has the benefit of reducing the # of socket > > > > > connections. My > > > > > >>>> feeling is that this will benefit most production users. In > > fact, > > > if > > > > > one > > > > > >>>> uses a hardware load balancer for producing data in 0.7, it > > > behaves > > > > in > > > > > >>>> exactly the same way (a producer will stick to a broker until > > the > > > > > >>>> reconnect > > > > > >>>> interval is reached). > > > > > >>>> > > > > > >>>> 3. It is true that If one is testing a topic with more than > one > > > > > partition > > > > > >>>> (which is not the default value), this behavior can be a bit > > > weird. > > > > > >>>> However, I think it can be mitigated by running multiple test > > > > producer > > > > > >>>> instances. > > > > > >>>> > > > > > >>>> 4. Someone reported in the mailing list that all data shows in > > > only > > > > > one > > > > > >>>> partition after a few weeks. This is clearly not the expected > > > > > behavior. We > > > > > >>>> can take a closer look to see if this is real issue. > > > > > >>>> > > > > > >>>> Do you think these address your concerns? > > > > > >>>> > > > > > >>>> Thanks, > > > > > >>>> > > > > > >>>> Jun > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein < > crypt...@gmail.com > > > > > > > > wrote: > > > > > >>>> > > > > > >>>>> How about creating a new class called RandomRefreshPartioner > > and > > > > copy > > > > > >>>> the > > > > > >>>>> DefaultPartitioner code to it and then revert the > > > > DefaultPartitioner > > > > > >>>> code. > > > > > >>>>> I appreciate this is a one time burden for folks using the > > > existing > > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production having to > > switch > > > to > > > > > the > > > > > >>>>> RandomRefreshPartioner and when folks deploy to production > will > > > > have > > > > > to > > > > > >>>>> consider this property change. > > > > > >>>>> > > > > > >>>>> I make this suggestion keeping in mind the new folks that on > > > board > > > > > with > > > > > >>>>> Kafka and when everyone is in development and testing mode > for > > > the > > > > > first > > > > > >>>>> time their experience would be as expected from how it would > > work > > > > in > > > > > >>>>> production this way. In dev/test when first using Kafka they > > > won't > > > > > >>>> have so > > > > > >>>>> many producers for partitions but would look to parallelize > > their > > > > > >>>> consumers > > > > > >>>>> IMHO. > > > > > >>>>> > > > > > >>>>> The random broker change sounds like maybe a bigger change > now > > > this > > > > > late > > > > > >>>>> in the release cycle if we can accommodate folks trying Kafka > > for > > > > the > > > > > >>>> first > > > > > >>>>> time and through their development and testing along with > full > > > > blown > > > > > >>>>> production deploys. > > > > > >>>>> > > > > > >>>>> /******************************************* > > > > > >>>>> Joe Stein > > > > > >>>>> Founder, Principal Consultant > > > > > >>>>> Big Data Open Source Security LLC > > > > > >>>>> http://www.stealth.ly > > > > > >>>>> Twitter: @allthingshadoop > > > > > >>>>> ********************************************/ > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <jjkosh...@gmail.com > > > > > > wrote: > > > > > >>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> Thanks for bringing this up - it is definitely an important > > > point > > > > > to > > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017 was uncovered > to > > > some > > > > > >>>>> degree by > > > > > >>>>>>> the fact that in our deployment we did not significantly > > > increase > > > > > the > > > > > >>>>> total > > > > > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we had say > four > > > > > >>>> partitions > > > > > >>>>> per > > > > > >>>>>>> broker, now we are using (say) eight partitions across the > > > > cluster. > > > > > >>>> So > > > > > >>>>> with > > > > > >>>>>>> random partitioning every producer would end up connecting > to > > > > > nearly > > > > > >>>>> every > > > > > >>>>>>> broker (unlike 0.7 in which we would connect to only one > > broker > > > > > >>>> within > > > > > >>>>> each > > > > > >>>>>>> reconnect interval). In a production-scale deployment that > > > causes > > > > > the > > > > > >>>>> high > > > > > >>>>>>> number of connections that KAFKA-1017 addresses. > > > > > >>>>>>> > > > > > >>>>>>> You are right that the fix of sticking to one partition > over > > > the > > > > > >>>>> metadata > > > > > >>>>>>> refresh interval goes against true consumer parallelism, > but > > > this > > > > > >>>> would > > > > > >>>>> be > > > > > >>>>>>> the case only if there are few producers. If you have a > > sizable > > > > > >>>> number > > > > > >>>>> of > > > > > >>>>>>> producers on average all partitions would get uniform > volumes > > > of > > > > > >>>> data. > > > > > >>>>>>> > > > > > >>>>>>> One tweak to KAFKA-1017 that I think is reasonable would be > > > > instead > > > > > >>>> of > > > > > >>>>>>> sticking to a random partition, stick to a random broker > and > > > send > > > > > to > > > > > >>>>> random > > > > > >>>>>>> partitions within that broker. This would make the behavior > > > > closer > > > > > to > > > > > >>>>> 0.7 > > > > > >>>>>>> wrt number of connections and random partitioning provided > > the > > > > > >>>> number of > > > > > >>>>>>> partitions per broker is high enough, which is why I > > mentioned > > > > the > > > > > >>>>>>> partition count (in our usage) in 0.7 vs 0.8 above. > Thoughts? > > > > > >>>>>>> > > > > > >>>>>>> Joel > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein wrote: > > > > > >>>>>>>> > > > > > >>>>>>>> First, let me apologize for not realizing/noticing this > > until > > > > > today. > > > > > >>>>> One > > > > > >>>>>>>> reason I left my last company was not being paid to work > on > > > > Kafka > > > > > >>>> nor > > > > > >>>>>>> being > > > > > >>>>>>> able to afford any time for a while to work on it. Now in > my > > > new > > > > > gig > > > > > >>>>> (just > > > > > >>>>>>> wrapped up my first week, woo hoo) while I am still not > "paid > > > to > > > > > >>>> work on > > > > > >>>>>>> Kafka" I can afford some more time for it now and maybe in > 6 > > > > > months I > > > > > >>>>> will > > > > > >>>>>>> be able to hire folks to work on Kafka (with more and more > > time > > > > for > > > > > >>>>> myself > > > > > >>>>>>> to work on it too) while we also work on client projects > > > > > (especially > > > > > >>>>> Kafka > > > > > >>>>>>> based ones). > > > > > >>>>>>> > > > > > >>>>>>> So, I understand about the changes that were made to fix > open > > > > file > > > > > >>>>> handles > > > > > >>>>>>> and make the random pinning be timed based (with a very > large > > > > > default > > > > > >>>>>>> time). Got all that. > > > > > >>>>>>> > > > > > >>>>>>> But, doesn't this completely negate what has been > > communicated > > > to > > > > > the > > > > > >>>>>>> community for a very long time and the expectation they > > have? I > > > > > >>>> think it > > > > > >>>>>>> does. > > > > > >>>>>>> > > > > > >>>>>>> The expected functionality for random partitioning is that > > > "This > > > > > can > > > > > >>>> be > > > > > >>>>>>> done in a round-robin fashion simply to balance load" and > > that > > > > the > > > > > >>>>>>> "producer" does it for you. > > > > > >>>>>>> > > > > > >>>>>>> Isn't a primary use case for partitions to paralyze > > consumers? > > > If > > > > > so > > > > > >>>>> then > > > > > >>>>>>> the expectation would be that all consumers would be > getting > > in > > > > > >>>> parallel > > > > > >>>>>>> equally in a "round robin fashion" the data that was > produced > > > for > > > > > the > > > > > >>>>>>> topic... simply to balance load...with the producer > handling > > it > > > > and > > > > > >>>> with > > > > > >>>>>>> the client application not having to-do anything. This > > > randomness > > > > > >>>>> occurring > > > > > >>>>>>> every 10 minutes can't balance load. > > > > > >>>>>>> > > > > > >>>>>>> If users are going to work around this anyways (as I would > > > > honestly > > > > > >>>> do > > > > > >>>>> too) > > > > > >>>>>>> doing a pseudo semantic random key and essentially forcing > > real > > > > > >>>>> randomness > > > > > >>>>>>> to simply balance load to my consumers running in parallel > > > would > > > > we > > > > > >>>>> still > > > > > >>>>>>> end up hitting the KAFKA-1017 problem anyways? If not then > > why > > > > > can't > > > > > >>>> we > > > > > >>>>>>> just give users the functionality and put back the 3 lines > of > > > > code > > > > > 1) > > > > > >>>>>>> if(key == null) 2) random.nextInt(numPartitions) 3) else > ... > > > If > > > > we > > > > > >>>>> would > > > > > >>>>>>> bump into KAFKA-1017 by working around it then we have not > > > really > > > > > >>>> solved > > > > > >>>>>>> the root cause problem and removing expected functionality > > for > > > a > > > > > >>>> corner > > > > > >>>>>>> case that might have other work arounds and/or code changes > > to > > > > > solve > > > > > >>>> it > > > > > >>>>>>> another way or am I still not getting something? > > > > > >>>>>>> > > > > > >>>>>>> Also, I was looking at testRandomPartitioner in > > > AsyncProducerTest > > > > > >>>> and I > > > > > >>>>>>> don't see how this would ever fail, the assertion is always > > for > > > > > >>>>> partitionId > > > > > >>>>>>> == 0 and it should be checking that data is going to > > different > > > > > >>>>> partitions > > > > > >>>>>>> for a topic, right? > > > > > >>>>>>> > > > > > >>>>>>> Let me know, I think this is an important discussion and > even > > > if > > > > it > > > > > >>>>> ends up > > > > > >>>>>>> as telling the community to only use one partition that is > > all > > > > you > > > > > >>>> need > > > > > >>>>> and > > > > > >>>>>>> partitions become our super columns (Apache Cassandra joke, > > its > > > > > >>>> funny) > > > > > >>>>> then > > > > > >>>>>>> we manage and support it and that is just how it is but if > > > > > partitions > > > > > >>>>> are a > > > > > >>>>>>> good thing and having multiple consumers scale in parrelel > > for > > > a > > > > > >>>> single > > > > > >>>>>>> topic also good then we have to manage and support that. > > > > > >>>>>>> > > > > > >>>>>>> /******************************************* > > > > > >>>>>>> Joe Stein > > > > > >>>>>>> Founder, Principal Consultant > > > > > >>>>>>> Big Data Open Source Security LLC > > > > > >>>>>>> http://www.stealth.ly > > > > > >>>>>>> Twitter: @allthingshadoop < > > > > http://www.twitter.com/allthingshadoop> > > > > > >>>>>>> ********************************************/ > > > > > >>>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>> > > > > > >>> > > > > > > > > > > > > > > >