Joel-- Yeah we could theoretically retain a neutered Partitioner interface that only had access to the byte[] key not the original object (which we no longer have). Ideologically most partitioning should really happen based on the byte[] not the original object to retain multi-language compatibility, but sometimes the object is useful.
I kind of think this is one of those things where doing either A or B is better than doing A and B both just for clarity. -Jay On Thu, Jan 30, 2014 at 5:15 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Does it preclude those various implementations? i.e., it could become > a producer config: > default.partitioner.strategy="minimize-connections"/"roundrobin" - and > so on; and implement those partitioners internally in the producer. > Not as clear as a .class config, but it accomplishes the same effect > no? > > On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > One downside to the 1A proposal is that without a Partitioner interface > we > > can't really package up and provide common partitioner implementations. > > Example of these would be > > 1. HashPartitioner - The default hash partitioning > > 2. RoundRobinPartitioner - Just round-robins over partitions > > 3. ConnectionMinimizingPartitioner - Choose partitions to minimize the > > number of nodes you need to connect maintain TCP connections to. > > 4. RangePartitioner - User provides break points that align partitions to > > key ranges > > 5. LocalityPartitioner - Prefer nodes on the same rack. This would be > nice > > for stream-processing use cases that read from one topic and write to > > another. We would have to include rack information in our metadata. > > > > Having this kind of functionality included is actually kind of nice. > > > > -Jay > > > > > > On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> Clark and all, > >> > >> I thought a little bit about the serialization question. Here are the > >> options I see and the pros and cons I can think of. I'd love to hear > >> people's preferences if you have a strong one. > >> > >> One important consideration is that however the producer works will also > >> need to be how the new consumer works (which we hope to write next). > That > >> is if you put objects in, you should get objects out. So we need to > think > >> through both sides. > >> > >> Options: > >> > >> Option 0: What is in the existing scala code and the java code I > >> posted--Serializer and Partitioner plugin provided by the user via > config. > >> Partitioner has a sane default, but Serializer needs to be specified in > >> config. > >> > >> Pros: How it works today in the scala code. > >> Cons: You have to adapt your serialization library of choice to our > >> interfaces. The reflective class loading means typo in the serializer > name > >> give odd errors. Likewise there is little type safety--the > ProducerRecord > >> takes Object and any type errors between the object provided and the > >> serializer give occurs at runtime. > >> > >> Option 1: No plugins > >> > >> This would mean byte[] key, byte[] value, and partitioning done by > client > >> by passing in a partition *number* directly. > >> > >> The problem with this is that it is tricky to compute the partition > >> correctly and probably most people won't. We could add a getCluster() > >> method to return the Cluster instance you should use for partitioning. > But > >> I suspect people would be lazy and not use that and instead hard-code > >> partitions which would break if partitions were added or they hard > coded it > >> wrong. In my experience 3 partitioning strategies cover like 99% of > cases > >> so not having a default implementation for this makes the common case > >> harder. Left to their own devices people will use bad hash functions and > >> get weird results. > >> > >> Option 1A: Alternatively we could partition by the key using the > existing > >> default partitioning strategy which only uses the byte[] anyway but > instead > >> of having a partitionKey we could have a numerical partition override > and > >> add the getCluster() method to get the cluster metadata. That would make > >> custom partitioning possible but handle the common case simply. > >> > >> Option 2: Partitioner plugin remains, serializers go. > >> > >> The problem here is that the partitioner might lose access to the > >> deserialized key which would occasionally be useful for semantic > >> partitioning schemes. The Partitioner could deserialize the key but that > >> would be inefficient and weird. > >> > >> This problem could be fixed by having key and value be byte[] but > >> retaining partitionKey as an Object and passing it to the partitioner as > >> is. Then if you have a partitioner which requires the deserialized key > you > >> would need to use this partition key. One weird side effect is that if > you > >> want to have a custom partition key BUT want to partition by the bytes > of > >> that key rather than the object value you must write a customer > partitioner > >> and serialize it yourself. > >> > >> Of these I think I prefer 1A but could be convinced of 0 since that is > how > >> it works now. > >> > >> Thoughts? > >> > >> -Jay > >> > >> > >> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> > wrote: > >> > >>> Jay - Thanks for the call for comments. Here's some initial input: > >>> > >>> - Make message serialization a client responsibility (making all > messages > >>> byte[]). Reflection-based loading makes it harder to use generic codecs > >>> (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec > programmatically. > >>> Non-default partitioning should require an explicit partition key. > >>> > >>> - I really like the fact that it will be native Java. Please consider > >>> using > >>> native maven and not sbt, gradle, ivy, etc as they don't reliably play > >>> nice > >>> in the maven ecosystem. A jar without a well-formed pom doesn't feel > like > >>> a > >>> real artifact. The pom's generated by sbt et al. are not well formed. > >>> Using > >>> maven will make builds and IDE integration much smoother. > >>> > >>> - Look at Nick Telford's dropwizard-extras package in which he defines > >>> some > >>> Jackson-compatible POJO's for loading configuration. Seems like your > >>> client > >>> migration is similar. The config objects should have constructors or > >>> factories that accept Map<String, String> and Properties for ease of > >>> migration. > >>> > >>> - Would you consider using the org.apache.kafka package for the new API > >>> (quibble) > >>> > >>> - Why create your own futures rather than use > >>> java.util.concurrent.Future<Long> or similar? Standard futures will > play > >>> nice with other reactive libs and things like J8's ComposableFuture. > >>> > >>> Thanks again, > >>> C > >>> > >>> > >>> > >>> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com > >>> >wrote: > >>> > >>> > A couple comments: > >>> > > >>> > 1) Why does the config use a broker list instead of discovering the > >>> brokers > >>> > in ZooKeeper? It doesn't match the HighLevelConsumer API. > >>> > > >>> > 2) It looks like broker connections are created on demand. I'm > >>> wondering > >>> > if sometimes you might want to flush out config or network > connectivity > >>> > issues before pushing the first message through. > >>> > > >>> > Should there also be a KafkaProducer.connect() or .open() method or > >>> > connectAll()? I guess it would try to connect to all brokers in the > >>> > BROKER_LIST_CONFIG > >>> > > >>> > HTH, > >>> > > >>> > Roger > >>> > > >>> > > >>> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> > >>> wrote: > >>> > > >>> > > As mentioned in a previous email we are working on a > >>> re-implementation of > >>> > > the producer. I would like to use this email thread to discuss the > >>> > details > >>> > > of the public API and the configuration. I would love for us to be > >>> > > incredibly picky about this public api now so it is as good as > >>> possible > >>> > and > >>> > > we don't need to break it in the future. > >>> > > > >>> > > The best way to get a feel for the API is actually to take a look > at > >>> the > >>> > > javadoc, my hope is to get the api docs good enough so that it is > >>> > > self-explanatory: > >>> > > > >>> > > > >>> > > >>> > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > >>> > > > >>> > > Please take a look at this API and give me any thoughts you may > have! > >>> > > > >>> > > It may also be reasonable to take a look at the configs: > >>> > > > >>> > > > >>> > > >>> > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > >>> > > > >>> > > The actual code is posted here: > >>> > > https://issues.apache.org/jira/browse/KAFKA-1227 > >>> > > > >>> > > A few questions or comments to kick things off: > >>> > > 1. We need to make a decision on whether serialization of the > user's > >>> key > >>> > > and value should be done by the user (with our api just taking > >>> byte[]) or > >>> > > if we should take an object and allow the user to configure a > >>> Serializer > >>> > > class which we instantiate via reflection. We take the later > approach > >>> in > >>> > > the current producer, and I have carried this through to this > >>> prototype. > >>> > > The tradeoff I see is this: taking byte[] is actually simpler, the > >>> user > >>> > can > >>> > > directly do whatever serialization they like. The complication is > >>> > actually > >>> > > partitioning. Currently partitioning is done by a similar plug-in > api > >>> > > (Partitioner) which the user can implement and configure to > override > >>> how > >>> > > partitions are assigned. If we take byte[] as input then we have no > >>> > access > >>> > > to the original object and partitioning MUST be done on the byte[]. > >>> This > >>> > is > >>> > > fine for hash partitioning. However for various types of semantic > >>> > > partitioning (range partitioning, or whatever) you would want > access > >>> to > >>> > the > >>> > > original object. In the current approach a producer who wishes to > send > >>> > > byte[] they have serialized in their own code can configure the > >>> > > BytesSerialization we supply which is just a "no op" serialization. > >>> > > 2. We should obsess over naming and make sure each of the class > names > >>> are > >>> > > good. > >>> > > 3. Jun has already pointed out that we need to include the topic > and > >>> > > partition in the response, which is absolutely right. I haven't > done > >>> that > >>> > > yet but that definitely needs to be there. > >>> > > 4. Currently RecordSend.await will throw an exception if the > request > >>> > > failed. The intention here is that producer.send(message).await() > >>> exactly > >>> > > simulates a synchronous call. Guozhang has noted that this is a > little > >>> > > annoying since the user must then catch exceptions. However if we > >>> remove > >>> > > this then if the user doesn't check for errors they won't know one > has > >>> > > occurred, which I predict will be a common mistake. > >>> > > 5. Perhaps there is more we could do to make the async callbacks > and > >>> > future > >>> > > we give back intuitive and easy to program against? > >>> > > > >>> > > Some background info on implementation: > >>> > > > >>> > > At a high level the primary difference in this producer is that it > >>> > removes > >>> > > the distinction between the "sync" and "async" producer. > Effectively > >>> all > >>> > > requests are sent asynchronously but always return a future > response > >>> > object > >>> > > that gives the offset as well as any error that may have occurred > when > >>> > the > >>> > > request is complete. The batching that is done in the async > producer > >>> only > >>> > > today is done whenever possible now. This means that the sync > >>> producer, > >>> > > under load, can get performance as good as the async producer > >>> > (preliminary > >>> > > results show the producer getting 1m messages/sec). This works > >>> similar to > >>> > > group commit in databases but with respect to the actual network > >>> > > transmission--any messages that arrive while a send is in progress > are > >>> > > batched together. It is also possible to encourage batching even > under > >>> > low > >>> > > load to save server resources by introducing a delay on the send to > >>> allow > >>> > > more messages to accumulate; this is done using the linger.msconfig > >>> > (this > >>> > > is similar to Nagle's algorithm in TCP). > >>> > > > >>> > > This producer does all network communication asynchronously and in > >>> > parallel > >>> > > to all servers so the performance penalty for acks=-1 and waiting > on > >>> > > replication should be much reduced. I haven't done much > benchmarking > >>> on > >>> > > this yet, though. > >>> > > > >>> > > The high level design is described a little here, though this is > now a > >>> > > little out of date: > >>> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > >>> > > > >>> > > -Jay > >>> > > > >>> > > >>> > >> > >> >