Joel--

Yeah we could theoretically retain a neutered Partitioner interface that
only had access to the byte[] key not the original object (which we no
longer have). Ideologically most partitioning should really happen based on
the byte[] not the original object to retain multi-language compatibility,
but sometimes the object is useful.

I kind of think this is one of those things where doing either A or B is
better than doing A and B both just for clarity.

-Jay


On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> Does it preclude those various implementations? i.e., it could become
> a producer config:
> default.partitioner.strategy="minimize-connections"/"roundrobin" - and
> so on; and implement those partitioners internally in the producer.
> Not as clear as a .class config, but it accomplishes the same effect
> no?
>
> On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > One downside to the 1A proposal is that without a Partitioner interface
> we
> > can't really package up and provide common partitioner implementations.
> > Example of these would be
> > 1. HashPartitioner - The default hash partitioning
> > 2. RoundRobinPartitioner - Just round-robins over partitions
> > 3. ConnectionMinimizingPartitioner - Choose partitions to minimize the
> > number of nodes you need to connect maintain TCP connections to.
> > 4. RangePartitioner - User provides break points that align partitions to
> > key ranges
> > 5. LocalityPartitioner - Prefer nodes on the same rack. This would be
> nice
> > for stream-processing use cases that read from one topic and write to
> > another. We would have to include rack information in our metadata.
> >
> > Having this kind of functionality included is actually kind of nice.
> >
> > -Jay
> >
> >
> > On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> >> Clark and all,
> >>
> >> I thought a little bit about the serialization question. Here are the
> >> options I see and the pros and cons I can think of. I'd love to hear
> >> people's preferences if you have a strong one.
> >>
> >> One important consideration is that however the producer works will also
> >> need to be how the new consumer works (which we hope to write next).
> That
> >> is if you put objects in, you should get objects out. So we need to
> think
> >> through both sides.
> >>
> >> Options:
> >>
> >> Option 0: What is in the existing scala code and the java code I
> >> posted--Serializer and Partitioner plugin provided by the user via
> config.
> >> Partitioner has a sane default, but Serializer needs to be specified in
> >> config.
> >>
> >> Pros: How it works today in the scala code.
> >> Cons: You have to adapt your serialization library of choice to our
> >> interfaces. The reflective class loading means typo in the serializer
> name
> >> give odd errors. Likewise there is little type safety--the
> ProducerRecord
> >> takes Object and any type errors between the object provided and the
> >> serializer give occurs at runtime.
> >>
> >> Option 1: No plugins
> >>
> >> This would mean byte[] key, byte[] value, and partitioning done by
> client
> >> by passing in a partition *number* directly.
> >>
> >> The problem with this is that it is tricky to compute the partition
> >> correctly and probably most people won't. We could add a getCluster()
> >> method to return the Cluster instance you should use for partitioning.
> But
> >> I suspect people would be lazy and not use that and instead hard-code
> >> partitions which would break if partitions were added or they hard
> coded it
> >> wrong. In my experience 3 partitioning strategies cover like 99% of
> cases
> >> so not having a default implementation for this makes the common case
> >> harder. Left to their own devices people will use bad hash functions and
> >> get weird results.
> >>
> >> Option 1A: Alternatively we could partition by the key using the
> existing
> >> default partitioning strategy which only uses the byte[] anyway but
> instead
> >> of having a partitionKey we could have a numerical partition override
> and
> >> add the getCluster() method to get the cluster metadata. That would make
> >> custom partitioning possible but handle the common case simply.
> >>
> >> Option 2: Partitioner plugin remains, serializers go.
> >>
> >> The problem here is that the partitioner might lose access to the
> >> deserialized key which would occasionally be useful for semantic
> >> partitioning schemes. The Partitioner could deserialize the key but that
> >> would be inefficient and weird.
> >>
> >> This problem could be fixed by having key and value be byte[] but
> >> retaining partitionKey as an Object and passing it to the partitioner as
> >> is. Then if you have a partitioner which requires the deserialized key
> you
> >> would need to use this partition key. One weird side effect is that if
> you
> >> want to have a custom partition key BUT want to partition by the bytes
> of
> >> that key rather than the object value you must write a customer
> partitioner
> >> and serialize it yourself.
> >>
> >> Of these I think I prefer 1A but could be convinced of 0 since that is
> how
> >> it works now.
> >>
> >> Thoughts?
> >>
> >> -Jay
> >>
> >>
> >> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com>
> wrote:
> >>
> >>> Jay - Thanks for the call for comments. Here's some initial input:
> >>>
> >>> - Make message serialization a client responsibility (making all
> messages
> >>> byte[]). Reflection-based loading makes it harder to use generic codecs
> >>> (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
> programmatically.
> >>> Non-default partitioning should require an explicit partition key.
> >>>
> >>> - I really like the fact that it will be native Java. Please consider
> >>> using
> >>> native maven and not sbt, gradle, ivy, etc as they don't reliably play
> >>> nice
> >>> in the maven ecosystem. A jar without a well-formed pom doesn't feel
> like
> >>> a
> >>> real artifact. The pom's generated by sbt et al. are not well formed.
> >>> Using
> >>> maven will make builds and IDE integration much smoother.
> >>>
> >>> - Look at Nick Telford's dropwizard-extras package in which he defines
> >>> some
> >>> Jackson-compatible POJO's for loading configuration. Seems like your
> >>> client
> >>> migration is similar. The config objects should have constructors or
> >>> factories that accept Map<String, String> and Properties for ease of
> >>> migration.
> >>>
> >>> - Would you consider using the org.apache.kafka package for the new API
> >>> (quibble)
> >>>
> >>> - Why create your own futures rather than use
> >>> java.util.concurrent.Future<Long> or similar? Standard futures will
> play
> >>> nice with other reactive libs and things like J8's ComposableFuture.
> >>>
> >>> Thanks again,
> >>> C
> >>>
> >>>
> >>>
> >>> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
> >>> >wrote:
> >>>
> >>> > A couple comments:
> >>> >
> >>> > 1) Why does the config use a broker list instead of discovering the
> >>> brokers
> >>> > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
> >>> >
> >>> > 2) It looks like broker connections are created on demand.  I'm
> >>> wondering
> >>> > if sometimes you might want to flush out config or network
> connectivity
> >>> > issues before pushing the first message through.
> >>> >
> >>> > Should there also be a KafkaProducer.connect() or .open() method or
> >>> > connectAll()?  I guess it would try to connect to all brokers in the
> >>> > BROKER_LIST_CONFIG
> >>> >
> >>> > HTH,
> >>> >
> >>> > Roger
> >>> >
> >>> >
> >>> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
> >>> wrote:
> >>> >
> >>> > > As mentioned in a previous email we are working on a
> >>> re-implementation of
> >>> > > the producer. I would like to use this email thread to discuss the
> >>> > details
> >>> > > of the public API and the configuration. I would love for us to be
> >>> > > incredibly picky about this public api now so it is as good as
> >>> possible
> >>> > and
> >>> > > we don't need to break it in the future.
> >>> > >
> >>> > > The best way to get a feel for the API is actually to take a look
> at
> >>> the
> >>> > > javadoc, my hope is to get the api docs good enough so that it is
> >>> > > self-explanatory:
> >>> > >
> >>> > >
> >>> >
> >>>
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> >>> > >
> >>> > > Please take a look at this API and give me any thoughts you may
> have!
> >>> > >
> >>> > > It may also be reasonable to take a look at the configs:
> >>> > >
> >>> > >
> >>> >
> >>>
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> >>> > >
> >>> > > The actual code is posted here:
> >>> > > https://issues.apache.org/jira/browse/KAFKA-1227
> >>> > >
> >>> > > A few questions or comments to kick things off:
> >>> > > 1. We need to make a decision on whether serialization of the
> user's
> >>> key
> >>> > > and value should be done by the user (with our api just taking
> >>> byte[]) or
> >>> > > if we should take an object and allow the user to configure a
> >>> Serializer
> >>> > > class which we instantiate via reflection. We take the later
> approach
> >>> in
> >>> > > the current producer, and I have carried this through to this
> >>> prototype.
> >>> > > The tradeoff I see is this: taking byte[] is actually simpler, the
> >>> user
> >>> > can
> >>> > > directly do whatever serialization they like. The complication is
> >>> > actually
> >>> > > partitioning. Currently partitioning is done by a similar plug-in
> api
> >>> > > (Partitioner) which the user can implement and configure to
> override
> >>> how
> >>> > > partitions are assigned. If we take byte[] as input then we have no
> >>> > access
> >>> > > to the original object and partitioning MUST be done on the byte[].
> >>> This
> >>> > is
> >>> > > fine for hash partitioning. However for various types of semantic
> >>> > > partitioning (range partitioning, or whatever) you would want
> access
> >>> to
> >>> > the
> >>> > > original object. In the current approach a producer who wishes to
> send
> >>> > > byte[] they have serialized in their own code can configure the
> >>> > > BytesSerialization we supply which is just a "no op" serialization.
> >>> > > 2. We should obsess over naming and make sure each of the class
> names
> >>> are
> >>> > > good.
> >>> > > 3. Jun has already pointed out that we need to include the topic
> and
> >>> > > partition in the response, which is absolutely right. I haven't
> done
> >>> that
> >>> > > yet but that definitely needs to be there.
> >>> > > 4. Currently RecordSend.await will throw an exception if the
> request
> >>> > > failed. The intention here is that producer.send(message).await()
> >>> exactly
> >>> > > simulates a synchronous call. Guozhang has noted that this is a
> little
> >>> > > annoying since the user must then catch exceptions. However if we
> >>> remove
> >>> > > this then if the user doesn't check for errors they won't know one
> has
> >>> > > occurred, which I predict will be a common mistake.
> >>> > > 5. Perhaps there is more we could do to make the async callbacks
> and
> >>> > future
> >>> > > we give back intuitive and easy to program against?
> >>> > >
> >>> > > Some background info on implementation:
> >>> > >
> >>> > > At a high level the primary difference in this producer is that it
> >>> > removes
> >>> > > the distinction between the "sync" and "async" producer.
> Effectively
> >>> all
> >>> > > requests are sent asynchronously but always return a future
> response
> >>> > object
> >>> > > that gives the offset as well as any error that may have occurred
> when
> >>> > the
> >>> > > request is complete. The batching that is done in the async
> producer
> >>> only
> >>> > > today is done whenever possible now. This means that the sync
> >>> producer,
> >>> > > under load, can get performance as good as the async producer
> >>> > (preliminary
> >>> > > results show the producer getting 1m messages/sec). This works
> >>> similar to
> >>> > > group commit in databases but with respect to the actual network
> >>> > > transmission--any messages that arrive while a send is in progress
> are
> >>> > > batched together. It is also possible to encourage batching even
> under
> >>> > low
> >>> > > load to save server resources by introducing a delay on the send to
> >>> allow
> >>> > > more messages to accumulate; this is done using the linger.msconfig
> >>> > (this
> >>> > > is similar to Nagle's algorithm in TCP).
> >>> > >
> >>> > > This producer does all network communication asynchronously and in
> >>> > parallel
> >>> > > to all servers so the performance penalty for acks=-1 and waiting
> on
> >>> > > replication should be much reduced. I haven't done much
> benchmarking
> >>> on
> >>> > > this yet, though.
> >>> > >
> >>> > > The high level design is described a little here, though this is
> now a
> >>> > > little out of date:
> >>> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >>> > >
> >>> > > -Jay
> >>> > >
> >>> >
> >>>
> >>
> >>
>

Reply via email to