However, currently, if key is null, the partitioner is not even called. Do
you want to change DefaultEventHandler too?

This also doesn't allow the partitioner to select a random and available
partition, which in my opinion is more important than making partitions
perfectly evenly balanced.

Thanks,

Jun


On Fri, Sep 27, 2013 at 9:53 AM, Joe Stein <crypt...@gmail.com> wrote:

> What I was proposing was two fold
>
> 1) revert the DefaultPartitioner class
>
> then
>
> 2) create a new partitioner that folks could use (like at LinkedIn you
> would use this partitioner instead) in ProducerConfig
>
> private class RandomRefreshTimPartitioner[T](props: VerifiableProperties =
> null) extends Partitioner[T] {
>   private val random = new java.util.Random
>
>   def partition(key: T, numPartitions: Int): Int = {
>     Utils.abs(key.hashCode) % numPartitions
>   }
> }
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Fri, Sep 27, 2013 at 12:46 PM, Jun Rao <jun...@gmail.com> wrote:
>
> > Joe,
> >
> > Not sure I fully understand your propose. Do you want to put the random
> > partitioning selection logic (for messages without a key) in the
> > partitioner without changing the partitioner api? That's difficult. The
> > issue is that in the current partitioner api, we don't know which
> > partitions are available. For example, if we have replication factor 1
> on a
> > topic and a broker is down, the best thing to do for the random
> partitioner
> > is to select an available partition at random (assuming more than 1
> > partition is created for the topic).
> >
> > Another option is to revert the logic in the random partitioning
> selection
> > logic in DefaultEventHandler to select a random partition per batch of
> > events (instead of sticking with a random partition for some configured
> > amount of time). This is doable, but I am not sure if it's that critical.
> > Since this is one of the two possible behaviors in 0.7, it's hard to say
> > whether people will be surprised by that. Preserving both behaviors in
> 0.7
> > will require changing the partitioner api. This is more work and I agree
> > it's better to do this post 0.8.0 final.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Fri, Sep 27, 2013 at 9:24 AM, Joe Stein <crypt...@gmail.com> wrote:
> >
> > > Jun, can we hold this extra change over for 0.8.1 and just go with
> > > reverting where we were before for the default with a new partition for
> > > meta refresh and support both?
> > >
> > > I am not sure I entirely understand why someone would need the extra
> > > functionality you are talking about which sounds cool though... adding
> it
> > > to the API (especially now) without people using it may just make folks
> > ask
> > > more questions and maybe not use it ... IDK ... but in any case we can
> > work
> > > on buttoning up 0.8 and shipping just the change for two partitioners
> > > https://issues.apache.org/jira/browse/KAFKA-1067 and circling back if
> we
> > > wanted on this extra item (including the discussion) to 0.8.1 or
> greater?
> > >  I am always of the mind of reduce complexity unless that complexity is
> > in
> > > fact better than not having it.
> > >
> > > On Sun, Sep 22, 2013 at 8:56 PM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > It's reasonable to make the behavior of random producers customizable
> > > > through a pluggable partitioner. So, if one doesn't care about # of
> > > socket
> > > > connections, one can choose to select a random partition on every
> send.
> > > If
> > > > one does have many producers, one can choose to periodically select a
> > > > random partition. To support this, the partitioner api needs to be
> > > changed
> > > > though.
> > > >
> > > > Instead of
> > > >   def partition(key: T, numPartitions: Int): Int
> > > >
> > > > we probably need something like the following:
> > > >   def partition(key: T, numPartitions: Int, availablePartitionList:
> > > > List[Int], isNewBatch: boolean, isRefreshMetadata: boolean): Int
> > > >
> > > > availablePartitionList: allows us to select only partitions that are
> > > > available.
> > > > isNewBatch: allows us to select the same partition for all messages
> in
> > a
> > > > given batch in the async mode.
> > > > isRefreshMedatadata: allows us to implement the policy of switching
> to
> > a
> > > > random partition periodically.
> > > >
> > > > This will make the partitioner api a bit more complicated. However,
> it
> > > does
> > > > provide enough information for customization.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Wed, Sep 18, 2013 at 4:23 PM, Joe Stein <crypt...@gmail.com>
> wrote:
> > > >
> > > > > Sounds good, I will create a JIRA and upload a patch.
> > > > >
> > > > >
> > > > > /*******************************************
> > > > >  Joe Stein
> > > > >  Founder, Principal Consultant
> > > > >  Big Data Open Source Security LLC
> > > > >  http://www.stealth.ly
> > > > >  Twitter: @allthingshadoop
> > > > > ********************************************/
> > > > >
> > > > >
> > > > > On Sep 17, 2013, at 1:19 PM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> > > > >
> > > > > > I agree that minimizing the number of producer connections (while
> > > > > > being a good thing) is really required in very large production
> > > > > > deployments, and the net-effect of the existing change is
> > > > > > counter-intuitive to users who expect an immediate even
> > distribution
> > > > > > across _all_ partitions of the topic.
> > > > > >
> > > > > > However, I don't think it is a hack because it is almost exactly
> > the
> > > > > > same behavior as 0.7 in one of its modes. The 0.7 producer
> (which I
> > > > > > think was even more confusing) had three modes:
> > > > > > i) ZK send
> > > > > > ii) Config send(a): static list of
> broker1:port1,broker2:port2,etc.
> > > > > > iii) Config send(b): static list of a hardwareVIP:VIPport
> > > > > >
> > > > > > (i) and (ii) would achieve even distribution. (iii) would
> > effectively
> > > > > > select one broker and distribute to partitions on that broker
> > within
> > > > > > each reconnect interval. (iii) is very similar to what we now do
> in
> > > > > > 0.8. (Although we stick to one partition during each metadata
> > refresh
> > > > > > interval that can be changed to stick to one broker and
> distribute
> > > > > > across partitions on that broker).
> > > > > >
> > > > > > At the same time, I agree with Joe's suggestion that we should
> keep
> > > > > > the more intuitive pre-KAFKA-1017 behavior as the default and
> move
> > > the
> > > > > > change in KAFKA-1017 to a more specific partitioner
> implementation.
> > > > > >
> > > > > > Joel
> > > > > >
> > > > > >
> > > > > > On Sun, Sep 15, 2013 at 8:44 AM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > > >> Let me ask another question which I think is more objective.
> Let's
> > > say
> > > > > 100
> > > > > >> random, smart infrastructure specialists try Kafka, of these 100
> > how
> > > > > many
> > > > > >> do you believe will
> > > > > >> 1. Say that this behavior is what they expected to happen?
> > > > > >> 2. Be happy with this behavior?
> > > > > >> I am not being facetious I am genuinely looking for a numerical
> > > > > estimate. I
> > > > > >> am trying to figure out if nobody thought about this or if my
> > > estimate
> > > > > is
> > > > > >> just really different. For what it is worth my estimate is 0
> and 5
> > > > > >> respectively.
> > > > > >>
> > > > > >> This would be fine expect that we changed it from the good
> > behavior
> > > to
> > > > > the
> > > > > >> bad behavior to fix an issue that probably only we have.
> > > > > >>
> > > > > >> -Jay
> > > > > >>
> > > > > >>
> > > > > >> On Sun, Sep 15, 2013 at 8:37 AM, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > > >>
> > > > > >>> I just took a look at this change. I agree with Joe, not to put
> > to
> > > > > fine a
> > > > > >>> point on it, but this is a confusing hack.
> > > > > >>>
> > > > > >>> Jun, I don't think wanting to minimizing the number of TCP
> > > > connections
> > > > > is
> > > > > >>> going to be a very common need for people with less than 10k
> > > > > producers. I
> > > > > >>> also don't think people are going to get very good load
> balancing
> > > out
> > > > > of
> > > > > >>> this because most people don't have a ton of producers. I think
> > > > > instead we
> > > > > >>> will spend the next year explaining this behavior which 99% of
> > > people
> > > > > will
> > > > > >>> think is a bug (because it is crazy, non-intuitive, and breaks
> > > their
> > > > > usage).
> > > > > >>>
> > > > > >>> Why was this done by adding special default behavior in the
> null
> > > key
> > > > > case
> > > > > >>> instead of as a partitioner? The argument that the partitioner
> > > > > interface
> > > > > >>> doesn't have sufficient information to choose a partition is
> not
> > a
> > > > good
> > > > > >>> argument for hacking in changes to the default, it is an
> argument
> > > > for *
> > > > > >>> improving* the partitioner interface.
> > > > > >>>
> > > > > >>> The whole point of a partitioner interface is to make it
> possible
> > > to
> > > > > plug
> > > > > >>> in non-standard behavior like this, right?
> > > > > >>>
> > > > > >>> -Jay
> > > > > >>>
> > > > > >>>
> > > > > >>> On Sat, Sep 14, 2013 at 8:15 PM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>>> Joe,
> > > > > >>>>
> > > > > >>>> Thanks for bringing this up. I want to clarify this a bit.
> > > > > >>>>
> > > > > >>>> 1. Currently, the producer side logic is that if the
> > partitioning
> > > > key
> > > > > is
> > > > > >>>> not provided (i.e., it is null), the partitioner won't be
> > called.
> > > We
> > > > > did
> > > > > >>>> that because we want to select a random and "available"
> > partition
> > > to
> > > > > send
> > > > > >>>> messages so that if some partitions are temporarily
> unavailable
> > > > > (because
> > > > > >>>> of
> > > > > >>>> broker failures), messages can still be sent to other
> > partitions.
> > > > > Doing
> > > > > >>>> this in the partitioner is difficult since the partitioner
> > doesn't
> > > > > know
> > > > > >>>> which partitions are currently available (the
> > DefaultEventHandler
> > > > > does).
> > > > > >>>>
> > > > > >>>> 2. As Joel said, the common use case in production is that
> there
> > > are
> > > > > many
> > > > > >>>> more producers than #partitions in a topic. In this case,
> > sticking
> > > > to
> > > > > a
> > > > > >>>> partition for a few minutes is not going to cause too much
> > > imbalance
> > > > > in
> > > > > >>>> the
> > > > > >>>> partitions and has the benefit of reducing the # of socket
> > > > > connections. My
> > > > > >>>> feeling is that this will benefit most production users. In
> > fact,
> > > if
> > > > > one
> > > > > >>>> uses a hardware load balancer for producing data in 0.7, it
> > > behaves
> > > > in
> > > > > >>>> exactly the same way (a producer will stick to a broker until
> > the
> > > > > >>>> reconnect
> > > > > >>>> interval is reached).
> > > > > >>>>
> > > > > >>>> 3. It is true that If one is testing a topic with more than
> one
> > > > > partition
> > > > > >>>> (which is not the default value), this behavior can be a bit
> > > weird.
> > > > > >>>> However, I think it can be mitigated by running multiple test
> > > > producer
> > > > > >>>> instances.
> > > > > >>>>
> > > > > >>>> 4. Someone reported in the mailing list that all data shows in
> > > only
> > > > > one
> > > > > >>>> partition after a few weeks. This is clearly not the expected
> > > > > behavior. We
> > > > > >>>> can take a closer look to see if this is real issue.
> > > > > >>>>
> > > > > >>>> Do you think these address your concerns?
> > > > > >>>>
> > > > > >>>> Thanks,
> > > > > >>>>
> > > > > >>>> Jun
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Sat, Sep 14, 2013 at 11:18 AM, Joe Stein <
> crypt...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>>
> > > > > >>>>> How about creating a new class called RandomRefreshPartioner
> > and
> > > > copy
> > > > > >>>> the
> > > > > >>>>> DefaultPartitioner code to it and then revert the
> > > > DefaultPartitioner
> > > > > >>>> code.
> > > > > >>>>> I appreciate this is a one time burden for folks using the
> > > existing
> > > > > >>>>> 0.8-beta1 bumping into KAFKA-1017 in production having to
> > switch
> > > to
> > > > > the
> > > > > >>>>> RandomRefreshPartioner and when folks deploy to production
> will
> > > > have
> > > > > to
> > > > > >>>>> consider this property change.
> > > > > >>>>>
> > > > > >>>>> I make this suggestion keeping in mind the new folks that on
> > > board
> > > > > with
> > > > > >>>>> Kafka and when everyone is in development and testing mode
> for
> > > the
> > > > > first
> > > > > >>>>> time their experience would be as expected from how it would
> > work
> > > > in
> > > > > >>>>> production this way.  In dev/test when first using Kafka they
> > > won't
> > > > > >>>> have so
> > > > > >>>>> many producers for partitions but would look to parallelize
> > their
> > > > > >>>> consumers
> > > > > >>>>> IMHO.
> > > > > >>>>>
> > > > > >>>>> The random broker change sounds like maybe a bigger change
> now
> > > this
> > > > > late
> > > > > >>>>> in the release cycle if we can accommodate folks trying Kafka
> > for
> > > > the
> > > > > >>>> first
> > > > > >>>>> time and through their development and testing along with
> full
> > > > blown
> > > > > >>>>> production deploys.
> > > > > >>>>>
> > > > > >>>>> /*******************************************
> > > > > >>>>> Joe Stein
> > > > > >>>>> Founder, Principal Consultant
> > > > > >>>>> Big Data Open Source Security LLC
> > > > > >>>>> http://www.stealth.ly
> > > > > >>>>> Twitter: @allthingshadoop
> > > > > >>>>> ********************************************/
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Sep 14, 2013, at 8:17 AM, Joel Koshy <jjkosh...@gmail.com
> >
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks for bringing this up - it is definitely an important
> > > point
> > > > > to
> > > > > >>>>>>> discuss. The underlying issue of KAFKA-1017 was uncovered
> to
> > > some
> > > > > >>>>> degree by
> > > > > >>>>>>> the fact that in our deployment we did not significantly
> > > increase
> > > > > the
> > > > > >>>>> total
> > > > > >>>>>>> number of partitions over 0.7 - i.e., in 0.7 we had say
> four
> > > > > >>>> partitions
> > > > > >>>>> per
> > > > > >>>>>>> broker, now we are using (say) eight partitions across the
> > > > cluster.
> > > > > >>>> So
> > > > > >>>>> with
> > > > > >>>>>>> random partitioning every producer would end up connecting
> to
> > > > > nearly
> > > > > >>>>> every
> > > > > >>>>>>> broker (unlike 0.7 in which we would connect to only one
> > broker
> > > > > >>>> within
> > > > > >>>>> each
> > > > > >>>>>>> reconnect interval). In a production-scale deployment that
> > > causes
> > > > > the
> > > > > >>>>> high
> > > > > >>>>>>> number of connections that KAFKA-1017 addresses.
> > > > > >>>>>>>
> > > > > >>>>>>> You are right that the fix of sticking to one partition
> over
> > > the
> > > > > >>>>> metadata
> > > > > >>>>>>> refresh interval goes against true consumer parallelism,
> but
> > > this
> > > > > >>>> would
> > > > > >>>>> be
> > > > > >>>>>>> the case only if there are few producers. If you have a
> > sizable
> > > > > >>>> number
> > > > > >>>>> of
> > > > > >>>>>>> producers on average all partitions would get uniform
> volumes
> > > of
> > > > > >>>> data.
> > > > > >>>>>>>
> > > > > >>>>>>> One tweak to KAFKA-1017 that I think is reasonable would be
> > > > instead
> > > > > >>>> of
> > > > > >>>>>>> sticking to a random partition, stick to a random broker
> and
> > > send
> > > > > to
> > > > > >>>>> random
> > > > > >>>>>>> partitions within that broker. This would make the behavior
> > > > closer
> > > > > to
> > > > > >>>>> 0.7
> > > > > >>>>>>> wrt number of connections and random partitioning provided
> > the
> > > > > >>>> number of
> > > > > >>>>>>> partitions per broker is high enough, which is why I
> > mentioned
> > > > the
> > > > > >>>>>>> partition count (in our usage) in 0.7 vs 0.8 above.
> Thoughts?
> > > > > >>>>>>>
> > > > > >>>>>>> Joel
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> On Friday, September 13, 2013, Joe Stein wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> First, let me apologize for not realizing/noticing this
> > until
> > > > > today.
> > > > > >>>>> One
> > > > > >>>>>>>> reason I left my last company was not being paid to work
> on
> > > > Kafka
> > > > > >>>> nor
> > > > > >>>>>>> being
> > > > > >>>>>>> able to afford any time for a while to work on it. Now in
> my
> > > new
> > > > > gig
> > > > > >>>>> (just
> > > > > >>>>>>> wrapped up my first week, woo hoo) while I am still not
> "paid
> > > to
> > > > > >>>> work on
> > > > > >>>>>>> Kafka" I can afford some more time for it now and maybe in
> 6
> > > > > months I
> > > > > >>>>> will
> > > > > >>>>>>> be able to hire folks to work on Kafka (with more and more
> > time
> > > > for
> > > > > >>>>> myself
> > > > > >>>>>>> to work on it too) while we also work on client projects
> > > > > (especially
> > > > > >>>>> Kafka
> > > > > >>>>>>> based ones).
> > > > > >>>>>>>
> > > > > >>>>>>> So, I understand about the changes that were made to fix
> open
> > > > file
> > > > > >>>>> handles
> > > > > >>>>>>> and make the random pinning be timed based (with a very
> large
> > > > > default
> > > > > >>>>>>> time).  Got all that.
> > > > > >>>>>>>
> > > > > >>>>>>> But, doesn't this completely negate what has been
> > communicated
> > > to
> > > > > the
> > > > > >>>>>>> community for a very long time and the expectation they
> > have? I
> > > > > >>>> think it
> > > > > >>>>>>> does.
> > > > > >>>>>>>
> > > > > >>>>>>> The expected functionality for random partitioning is that
> > > "This
> > > > > can
> > > > > >>>> be
> > > > > >>>>>>> done in a round-robin fashion simply to balance load" and
> > that
> > > > the
> > > > > >>>>>>> "producer" does it for you.
> > > > > >>>>>>>
> > > > > >>>>>>> Isn't a primary use case for partitions to paralyze
> > consumers?
> > > If
> > > > > so
> > > > > >>>>> then
> > > > > >>>>>>> the expectation would be that all consumers would be
> getting
> > in
> > > > > >>>> parallel
> > > > > >>>>>>> equally in a "round robin fashion" the data that was
> produced
> > > for
> > > > > the
> > > > > >>>>>>> topic... simply to balance load...with the producer
> handling
> > it
> > > > and
> > > > > >>>> with
> > > > > >>>>>>> the client application not having to-do anything. This
> > > randomness
> > > > > >>>>> occurring
> > > > > >>>>>>> every 10 minutes can't balance load.
> > > > > >>>>>>>
> > > > > >>>>>>> If users are going to work around this anyways (as I would
> > > > honestly
> > > > > >>>> do
> > > > > >>>>> too)
> > > > > >>>>>>> doing a pseudo semantic random key and essentially forcing
> > real
> > > > > >>>>> randomness
> > > > > >>>>>>> to simply balance load to my consumers running in parallel
> > > would
> > > > we
> > > > > >>>>> still
> > > > > >>>>>>> end up hitting the KAFKA-1017 problem anyways? If not then
> > why
> > > > > can't
> > > > > >>>> we
> > > > > >>>>>>> just give users the functionality and put back the 3 lines
> of
> > > > code
> > > > > 1)
> > > > > >>>>>>> if(key == null) 2)  random.nextInt(numPartitions) 3) else
> ...
> > > If
> > > > we
> > > > > >>>>> would
> > > > > >>>>>>> bump into KAFKA-1017 by working around it then we have not
> > > really
> > > > > >>>> solved
> > > > > >>>>>>> the root cause problem and removing expected functionality
> > for
> > > a
> > > > > >>>> corner
> > > > > >>>>>>> case that might have other work arounds and/or code changes
> > to
> > > > > solve
> > > > > >>>> it
> > > > > >>>>>>> another way or am I still not getting something?
> > > > > >>>>>>>
> > > > > >>>>>>> Also, I was looking at testRandomPartitioner in
> > > AsyncProducerTest
> > > > > >>>> and I
> > > > > >>>>>>> don't see how this would ever fail, the assertion is always
> > for
> > > > > >>>>> partitionId
> > > > > >>>>>>> == 0 and it should be checking that data is going to
> > different
> > > > > >>>>> partitions
> > > > > >>>>>>> for a topic, right?
> > > > > >>>>>>>
> > > > > >>>>>>> Let me know, I think this is an important discussion and
> even
> > > if
> > > > it
> > > > > >>>>> ends up
> > > > > >>>>>>> as telling the community to only use one partition that is
> > all
> > > > you
> > > > > >>>> need
> > > > > >>>>> and
> > > > > >>>>>>> partitions become our super columns (Apache Cassandra joke,
> > its
> > > > > >>>> funny)
> > > > > >>>>> then
> > > > > >>>>>>> we manage and support it and that is just how it is but if
> > > > > partitions
> > > > > >>>>> are a
> > > > > >>>>>>> good thing and having multiple consumers scale in parrelel
> > for
> > > a
> > > > > >>>> single
> > > > > >>>>>>> topic also good then we have to manage and support that.
> > > > > >>>>>>>
> > > > > >>>>>>> /*******************************************
> > > > > >>>>>>> Joe Stein
> > > > > >>>>>>> Founder, Principal Consultant
> > > > > >>>>>>> Big Data Open Source Security LLC
> > > > > >>>>>>> http://www.stealth.ly
> > > > > >>>>>>> Twitter: @allthingshadoop <
> > > > http://www.twitter.com/allthingshadoop>
> > > > > >>>>>>> ********************************************/
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
>

Reply via email to