hmmm, yeah, on I don't want todo that ... if we don't have to. What if the DefaultPartitioner code looked like this instead =8^)
private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] { def partition(key: T, numPartitions: Int): Int = { if (key == null) { import java.util.UUID Utils.abs(UUID.randomUUID.toString()) % numPartitions } else { Utils.abs(key.hashCode) % numPartitions } } } Again the goal here is the simple (often initial and dev side up and running out of the box) so folks don't have to randomize the keys themselves to get this effect We would still have to also have RandomMetaRefreshPartitioner class right? so null keys there would wait for the time refresh for that use case, right? private class RandomMetaRefreshPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] { def partition(key: T, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions } } On Fri, Sep 27, 2013 at 1:10 PM, Jun Rao <jun...@gmail.com> wrote: > However, currently, if key is null, the partitioner is not even called. Do > you want to change DefaultEventHandler too? > > This also doesn't allow the partitioner to select a random and available > partition, which in my opinion is more important than making partitions > perfectly evenly balanced. > > Thanks, > > Jun > > > On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <crypt...@gmail.com> wrote: > > > What I was proposing was two fold > > > > 1) revert the DefaultPartitioner class > > > > then > > > > 2) create a new partitioner that folks could use (like at LinkedIn you > > would use this partitioner instead) in ProducerConfig > > > > private class RandomRefreshTimPartitioner[T](props: VerifiableProperties > = > > null) extends Partitioner[T] { > > private val random = new java.util.Random > > > > def partition(key: T, numPartitions: Int): Int = { > > Utils.abs(key.hashCode) % numPartitions > > } > > } > > > > /******************************************* > > Joe Stein > > Founder, Principal Consultant > > Big Data Open Source Security LLC > > http://www.stealth.ly > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > ********************************************/ > > > > > > On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > Joe, > > > > > > Not sure I fully understand your propose. Do you want to put the random > > > partitioning selection logic (for messages without a key) in the > > > partitioner without changing the partitioner api? That's difficult. The > > > issue is that in the current partitioner api, we don't know which > > > partitions are available. For example, if we have replication factor 1 > > on a > > > topic and a broker is down, the best thing to do for the random > > partitioner > > > is to select an available partition at random (assuming more than 1 > > > partition is created for the topic). > > > > > > Another option is to revert the logic in the random partitioning > > selection > > > logic in DefaultEventHandler to select a random partition per batch of > > > events (instead of sticking with a random partition for some configured > > > amount of time). This is doable, but I am not sure if it's that > critical. > > > Since this is one of the two possible behaviors in 0.7, it's hard to > say > > > whether people will be surprised by that. Preserving both behaviors in > > 0.7 > > > will require changing the partitioner api. This is more work and I > agree > > > it's better to do this post 0.8.0 final. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <crypt...@gmail.com> wrote: > > > > > > > Jun, can we hold this extra change over for 0.8.1 and just go with > > > > reverting where we were before for the default with a new partition > for > > > > meta refresh and support both? > > > > > > > > I am not sure I entirely understand why someone would need the extra > > > > functionality you are talking about which sounds cool though... > adding > > it > > > > to the API (especially now) without people using it may just make > folks > > > ask > > > > more questions and maybe not use it ... IDK ... but in any case we > can > > > work > > > > on buttoning up 0.8 and shipping just the change for two partitioners > > > > https://issues.apache.org/jira/browse/KAFKA-1067 and circling back > if > > we > > > > wanted on this extra item (including the discussion) to 0.8.1 or > > greater? > > > > I am always of the mind of reduce complexity unless that complexity > is > > > in > > > > fact better than not having it. > > > > > > > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > It's reasonable to make the behavior of random producers > customizable > > > > > through a pluggable partitioner. So, if one doesn't care about # of > > > > socket > > > > > connections, one can choose to select a random partition on every > > send. > > > > If > > > > > one does have many producers, one can choose to periodically > select a > > > > > random partition. To support this, the partitioner api needs to be > > > > changed > > > > > though. > > > > > > > > > > Instead of > > > > > def partition(key: T, numPartitions: Int): Int > > > > > > > > > > we probably need something like the following: > > > > > def partition(key: T, numPartitions: Int, availablePartitionList: > > > > > List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int > > > > > > > > > > availablePartitionList: allows us to select only partitions that > are > > > > > available. > > > > > isNewBatch: allows us to select the same partition for all messages > > in > > > a > > > > > given batch in the async mode. > > > > > isRefreshMedatadata: allows us to implement the policy of switching > > to > > > a > > > > > random partition periodically. > > > > > > > > > > This will make the partitioner api a bit more complicated. However, > > it > > > > does > > > > > provide enough information for customization. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <crypt...@gmail.com> > > wrote: > > > > > > > > > > > Sounds good, I will create a JIRA and upload a patch. > > > > > > > > > > > > > > > > > > /******************************************* > > > > > > Joe Stein > > > > > > Founder, Principal Consultant > > > > > > Big Data Open Source Security LLC > > > > > > http://www.stealth.ly > > > > > > Twitter: @allthingshadoop > > > > > > ********************************************/ > > > > > > > > > > > > > > > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > > > > > > > > > > > > I agree that minimizing the number of producer connections > (while > > > > > > > being a good thing) is really required in very large production > > > > > > > deployments, and the net-effect of the existing change is > > > > > > > counter-intuitive to users who expect an immediate even > > > distribution > > > > > > > across _all_ partitions of the topic. > > > > > > > > > > > > > > However, I don't think it is a hack because it is almost > exactly > > > the > > > > > > > same behavior as 0.7 in one of its modes. The 0.7 producer > > (which I > > > > > > > think was even more confusing) had three modes: > > > > > > > i) ZK send > > > > > > > ii) Config send(a): static list of > > broker1:port1,broker2:port2,etc. > > > > > > > iii) Config send(b): static list of a hardwareVIP:VIPport > > > > > > > > > > > > > > (i) and (ii) would achieve even distribution. (iii) would > > > effectively > > > > > > > select one broker and distribute to partitions on that broker > > > within > > > > > > > each reconnect interval. (iii) is very similar to what we now > do > > in > > > > > > > 0.8. (Although we stick to one partition during each metadata > > > refresh > > > > > > > interval that can be changed to stick to one broker and > > distribute > > > > > > > across partitions on that broker). > > > > > > > > > > > > > > At the same time, I agree with Joe's suggestion that we should > > keep > > > > > > > the more intuitive pre-KAFKA-1017 behavior as the default and > > move > > > > the > > > > > > > change in KAFKA-1017 to a more specific partitioner > > implementation. > > > > > > > > > > > > > > Joel > > > > > > > > > > > > > > > > > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps < > jay.kr...@gmail.com> > > > > > wrote: > > > > > > >> Let me ask another question which I think is more objective. > > Let's > > > > say > > > > > > 100 > > > > > > >> random, smart infrastructure specialists try Kafka, of these > 100 > > > how > > > > > > many > > > > > > >> do you believe will > > > > > > >> 1. Say that this behavior is what they expected to happen? > > > > > > >> 2. Be happy with this behavior? > > > > > > >> I am not being facetious I am genuinely looking for a > numerical > > > > > > estimate. I > > > > > > >> am trying to figure out if nobody thought about this or if my > > > > estimate > > > > > > is > > > > > > >> just really different. For what it is worth my estimate is 0 > > and 5 > > > > > > >> respectively. > > > > > > >> > > > > > > >> This would be fine expect that we changed it from the good > > > behavior > > > > to > > > > > > the > > > > > > >> bad behavior to fix an issue that probably only we have. > > > > > > >> > > > > > > >> -Jay > > > > > > >> > > > > > > >> > > > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps < > jay.kr...@gmail.com > > > > > > > > wrote: > > > > > > >> > > > > > > >>> I just took a look at this change. I agree with Joe, not to > put > > > to > > > > > > fine a > > > > > > >>> point on it, but this is a confusing hack. > > > > > > >>> > > > > > > >>> Jun, I don't think wanting to minimizing the number of TCP > > > > > connections > > > > > > is > > > > > > >>> going to be a very common need for people with less than 10k > > > > > > producers. I > > > > > > >>> also don't think people are going to get very good load > > balancing > > > > out > > > > > > of > > > > > > >>> this because most people don't have a ton of producers. I > think > > > > > > instead we > > > > > > >>> will spend the next year explaining this behavior which 99% > of > > > > people > > > > > > will > > > > > > >>> think is a bug (because it is crazy, non-intuitive, and > breaks > > > > their > > > > > > usage). > > > > > > >>> > > > > > > >>> Why was this done by adding special default behavior in the > > null > > > > key > > > > > > case > > > > > > >>> instead of as a partitioner? The argument that the > partitioner > > > > > > interface > > > > > > >>> doesn't have sufficient information to choose a partition is > > not > > > a > > > > > good > > > > > > >>> argument for hacking in changes to the default, it is an > > argument > > > > > for * > > > > > > >>> improving* the partitioner interface. > > > > > > >>> > > > > > > >>> The whole point of a partitioner interface is to make it > > possible > > > > to > > > > > > plug > > > > > > >>> in non-standard behavior like this, right? > > > > > > >>> > > > > > > >>> -Jay > > > > > > >>> > > > > > > >>> > > > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com> > > > wrote: > > > > > > >>> > > > > > > >>>> Joe, > > > > > > >>>> > > > > > > >>>> Thanks for bringing this up. I want to clarify this a bit. > > > > > > >>>> > > > > > > >>>> 1. Currently, the producer side logic is that if the > > > partitioning > > > > > key > > > > > > is > > > > > > >>>> not provided (i.e., it is null), the partitioner won't be > > > called. > > > > We > > > > > > did > > > > > > >>>> that because we want to select a random and "available" > > > partition > > > > to > > > > > > send > > > > > > >>>> messages so that if some partitions are temporarily > > unavailable > > > > > > (because > > > > > > >>>> of > > > > > > >>>> broker failures), messages can still be sent to other > > > partitions. > > > > > > Doing > > > > > > >>>> this in the partitioner is difficult since the partitioner > > > doesn't > > > > > > know > > > > > > >>>> which partitions are currently available (the > > > DefaultEventHandler > > > > > > does). > > > > > > >>>> > > > > > > >>>> 2. As Joel said, the common use case in production is that > > there > > > > are > > > > > > many > > > > > > >>>> more producers than #partitions in a topic. In this case, > > > sticking > > > > > to > > > > > > a > > > > > > >>>> partition for a few minutes is not going to cause too much > > > > imbalance > > > > > > in > > > > > > >>>> the > > > > > > >>>> partitions and has the benefit of reducing the # of socket > > > > > > connections. My > > > > > > >>>> feeling is that this will benefit most production users. In > > > fact, > > > > if > > > > > > one > > > > > > >>>> uses a hardware load balancer for producing data in 0.7, it > > > > behaves > > > > > in > > > > > > >>>> exactly the same way (a producer will stick to a broker > until > > > the > > > > > > >>>> reconnect > > > > > > >>>> interval is reached). > > > > > > >>>> > > > > > > >>>> 3. It is true that If one is testing a topic with more than > > one > > > > > > partition > > > > > > >>>> (which is not the default value), this behavior can be a bit > > > > weird. > > > > > > >>>> However, I think it can be mitigated by running multiple > test > > > > > producer > > > > > > >>>> instances. > > > > > > >>>> > > > > > > >>>> 4. Someone reported in the mailing list that all data shows > in > > > > only > > > > > > one > > > > > > >>>> partition after a few weeks. This is clearly not the > expected > > > > > > behavior. We > > > > > > >>>> can take a closer look to see if this is real issue. > > > > > > >>>> > > > > > > >>>> Do you think these address your concerns? > > > > > > >>>> > > > > > > >>>> Thanks, > > > > > > >>>> > > > > > > >>>> Jun > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein < > > crypt...@gmail.com > > > > > > > > > > wrote: > > > > > > >>>> > > > > > > >>>>> How about creating a new class called > RandomRefreshPartioner > > > and > > > > > copy > > > > > > >>>> the > > > > > > >>>>> DefaultPartitioner code to it and then revert the > > > > > DefaultPartitioner > > > > > > >>>> code. > > > > > > >>>>> I appreciate this is a one time burden for folks using the > > > > existing > > > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production having to > > > switch > > > > to > > > > > > the > > > > > > >>>>> RandomRefreshPartioner and when folks deploy to production > > will > > > > > have > > > > > > to > > > > > > >>>>> consider this property change. > > > > > > >>>>> > > > > > > >>>>> I make this suggestion keeping in mind the new folks that > on > > > > board > > > > > > with > > > > > > >>>>> Kafka and when everyone is in development and testing mode > > for > > > > the > > > > > > first > > > > > > >>>>> time their experience would be as expected from how it > would > > > work > > > > > in > > > > > > >>>>> production this way. In dev/test when first using Kafka > they > > > > won't > > > > > > >>>> have so > > > > > > >>>>> many producers for partitions but would look to parallelize > > > their > > > > > > >>>> consumers > > > > > > >>>>> IMHO. > > > > > > >>>>> > > > > > > >>>>> The random broker change sounds like maybe a bigger change > > now > > > > this > > > > > > late > > > > > > >>>>> in the release cycle if we can accommodate folks trying > Kafka > > > for > > > > > the > > > > > > >>>> first > > > > > > >>>>> time and through their development and testing along with > > full > > > > > blown > > > > > > >>>>> production deploys. > > > > > > >>>>> > > > > > > >>>>> /******************************************* > > > > > > >>>>> Joe Stein > > > > > > >>>>> Founder, Principal Consultant > > > > > > >>>>> Big Data Open Source Security LLC > > > > > > >>>>> http://www.stealth.ly > > > > > > >>>>> Twitter: @allthingshadoop > > > > > > >>>>> ********************************************/ > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy < > jjkosh...@gmail.com > > > > > > > > wrote: > > > > > > >>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> Thanks for bringing this up - it is definitely an > important > > > > point > > > > > > to > > > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017 was uncovered > > to > > > > some > > > > > > >>>>> degree by > > > > > > >>>>>>> the fact that in our deployment we did not significantly > > > > increase > > > > > > the > > > > > > >>>>> total > > > > > > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we had say > > four > > > > > > >>>> partitions > > > > > > >>>>> per > > > > > > >>>>>>> broker, now we are using (say) eight partitions across > the > > > > > cluster. > > > > > > >>>> So > > > > > > >>>>> with > > > > > > >>>>>>> random partitioning every producer would end up > connecting > > to > > > > > > nearly > > > > > > >>>>> every > > > > > > >>>>>>> broker (unlike 0.7 in which we would connect to only one > > > broker > > > > > > >>>> within > > > > > > >>>>> each > > > > > > >>>>>>> reconnect interval). In a production-scale deployment > that > > > > causes > > > > > > the > > > > > > >>>>> high > > > > > > >>>>>>> number of connections that KAFKA-1017 addresses. > > > > > > >>>>>>> > > > > > > >>>>>>> You are right that the fix of sticking to one partition > > over > > > > the > > > > > > >>>>> metadata > > > > > > >>>>>>> refresh interval goes against true consumer parallelism, > > but > > > > this > > > > > > >>>> would > > > > > > >>>>> be > > > > > > >>>>>>> the case only if there are few producers. If you have a > > > sizable > > > > > > >>>> number > > > > > > >>>>> of > > > > > > >>>>>>> producers on average all partitions would get uniform > > volumes > > > > of > > > > > > >>>> data. > > > > > > >>>>>>> > > > > > > >>>>>>> One tweak to KAFKA-1017 that I think is reasonable would > be > > > > > instead > > > > > > >>>> of > > > > > > >>>>>>> sticking to a random partition, stick to a random broker > > and > > > > send > > > > > > to > > > > > > >>>>> random > > > > > > >>>>>>> partitions within that broker. This would make the > behavior > > > > > closer > > > > > > to > > > > > > >>>>> 0.7 > > > > > > >>>>>>> wrt number of connections and random partitioning > provided > > > the > > > > > > >>>> number of > > > > > > >>>>>>> partitions per broker is high enough, which is why I > > > mentioned > > > > > the > > > > > > >>>>>>> partition count (in our usage) in 0.7 vs 0.8 above. > > Thoughts? > > > > > > >>>>>>> > > > > > > >>>>>>> Joel > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein wrote: > > > > > > >>>>>>>> > > > > > > >>>>>>>> First, let me apologize for not realizing/noticing this > > > until > > > > > > today. > > > > > > >>>>> One > > > > > > >>>>>>>> reason I left my last company was not being paid to work > > on > > > > > Kafka > > > > > > >>>> nor > > > > > > >>>>>>> being > > > > > > >>>>>>> able to afford any time for a while to work on it. Now in > > my > > > > new > > > > > > gig > > > > > > >>>>> (just > > > > > > >>>>>>> wrapped up my first week, woo hoo) while I am still not > > "paid > > > > to > > > > > > >>>> work on > > > > > > >>>>>>> Kafka" I can afford some more time for it now and maybe > in > > 6 > > > > > > months I > > > > > > >>>>> will > > > > > > >>>>>>> be able to hire folks to work on Kafka (with more and > more > > > time > > > > > for > > > > > > >>>>> myself > > > > > > >>>>>>> to work on it too) while we also work on client projects > > > > > > (especially > > > > > > >>>>> Kafka > > > > > > >>>>>>> based ones). > > > > > > >>>>>>> > > > > > > >>>>>>> So, I understand about the changes that were made to fix > > open > > > > > file > > > > > > >>>>> handles > > > > > > >>>>>>> and make the random pinning be timed based (with a very > > large > > > > > > default > > > > > > >>>>>>> time). Got all that. > > > > > > >>>>>>> > > > > > > >>>>>>> But, doesn't this completely negate what has been > > > communicated > > > > to > > > > > > the > > > > > > >>>>>>> community for a very long time and the expectation they > > > have? I > > > > > > >>>> think it > > > > > > >>>>>>> does. > > > > > > >>>>>>> > > > > > > >>>>>>> The expected functionality for random partitioning is > that > > > > "This > > > > > > can > > > > > > >>>> be > > > > > > >>>>>>> done in a round-robin fashion simply to balance load" and > > > that > > > > > the > > > > > > >>>>>>> "producer" does it for you. > > > > > > >>>>>>> > > > > > > >>>>>>> Isn't a primary use case for partitions to paralyze > > > consumers? > > > > If > > > > > > so > > > > > > >>>>> then > > > > > > >>>>>>> the expectation would be that all consumers would be > > getting > > > in > > > > > > >>>> parallel > > > > > > >>>>>>> equally in a "round robin fashion" the data that was > > produced > > > > for > > > > > > the > > > > > > >>>>>>> topic... simply to balance load...with the producer > > handling > > > it > > > > > and > > > > > > >>>> with > > > > > > >>>>>>> the client application not having to-do anything. This > > > > randomness > > > > > > >>>>> occurring > > > > > > >>>>>>> every 10 minutes can't balance load. > > > > > > >>>>>>> > > > > > > >>>>>>> If users are going to work around this anyways (as I > would > > > > > honestly > > > > > > >>>> do > > > > > > >>>>> too) > > > > > > >>>>>>> doing a pseudo semantic random key and essentially > forcing > > > real > > > > > > >>>>> randomness > > > > > > >>>>>>> to simply balance load to my consumers running in > parallel > > > > would > > > > > we > > > > > > >>>>> still > > > > > > >>>>>>> end up hitting the KAFKA-1017 problem anyways? If not > then > > > why > > > > > > can't > > > > > > >>>> we > > > > > > >>>>>>> just give users the functionality and put back the 3 > lines > > of > > > > > code > > > > > > 1) > > > > > > >>>>>>> if(key == null) 2) random.nextInt(numPartitions) 3) else > > ... > > > > If > > > > > we > > > > > > >>>>> would > > > > > > >>>>>>> bump into KAFKA-1017 by working around it then we have > not > > > > really > > > > > > >>>> solved > > > > > > >>>>>>> the root cause problem and removing expected > functionality > > > for > > > > a > > > > > > >>>> corner > > > > > > >>>>>>> case that might have other work arounds and/or code > changes > > > to > > > > > > solve > > > > > > >>>> it > > > > > > >>>>>>> another way or am I still not getting something? > > > > > > >>>>>>> > > > > > > >>>>>>> Also, I was looking at testRandomPartitioner in > > > > AsyncProducerTest > > > > > > >>>> and I > > > > > > >>>>>>> don't see how this would ever fail, the assertion is > always > > > for > > > > > > >>>>> partitionId > > > > > > >>>>>>> == 0 and it should be checking that data is going to > > > different > > > > > > >>>>> partitions > > > > > > >>>>>>> for a topic, right? > > > > > > >>>>>>> > > > > > > >>>>>>> Let me know, I think this is an important discussion and > > even > > > > if > > > > > it > > > > > > >>>>> ends up > > > > > > >>>>>>> as telling the community to only use one partition that > is > > > all > > > > > you > > > > > > >>>> need > > > > > > >>>>> and > > > > > > >>>>>>> partitions become our super columns (Apache Cassandra > joke, > > > its > > > > > > >>>> funny) > > > > > > >>>>> then > > > > > > >>>>>>> we manage and support it and that is just how it is but > if > > > > > > partitions > > > > > > >>>>> are a > > > > > > >>>>>>> good thing and having multiple consumers scale in > parrelel > > > for > > > > a > > > > > > >>>> single > > > > > > >>>>>>> topic also good then we have to manage and support that. > > > > > > >>>>>>> > > > > > > >>>>>>> /******************************************* > > > > > > >>>>>>> Joe Stein > > > > > > >>>>>>> Founder, Principal Consultant > > > > > > >>>>>>> Big Data Open Source Security LLC > > > > > > >>>>>>> http://www.stealth.ly > > > > > > >>>>>>> Twitter: @allthingshadoop < > > > > > http://www.twitter.com/allthingshadoop> > > > > > > >>>>>>> ********************************************/ > > > > > > >>>>>>> > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > > > > > > >