For RangePartitioner, it seems that we will need the key object. Range-partitioning on the serialized key bytes is probably confusing.
Thanks, Jun On Thu, Jan 30, 2014 at 4:14 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > One downside to the 1A proposal is that without a Partitioner interface we > can't really package up and provide common partitioner implementations. > Example of these would be > 1. HashPartitioner - The default hash partitioning > 2. RoundRobinPartitioner - Just round-robins over partitions > 3. ConnectionMinimizingPartitioner - Choose partitions to minimize the > number of nodes you need to connect maintain TCP connections to. > 4. RangePartitioner - User provides break points that align partitions to > key ranges > 5. LocalityPartitioner - Prefer nodes on the same rack. This would be nice > for stream-processing use cases that read from one topic and write to > another. We would have to include rack information in our metadata. > > Having this kind of functionality included is actually kind of nice. > > -Jay > > > On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Clark and all, > > > > I thought a little bit about the serialization question. Here are the > > options I see and the pros and cons I can think of. I'd love to hear > > people's preferences if you have a strong one. > > > > One important consideration is that however the producer works will also > > need to be how the new consumer works (which we hope to write next). That > > is if you put objects in, you should get objects out. So we need to think > > through both sides. > > > > Options: > > > > Option 0: What is in the existing scala code and the java code I > > posted--Serializer and Partitioner plugin provided by the user via > config. > > Partitioner has a sane default, but Serializer needs to be specified in > > config. > > > > Pros: How it works today in the scala code. > > Cons: You have to adapt your serialization library of choice to our > > interfaces. The reflective class loading means typo in the serializer > name > > give odd errors. Likewise there is little type safety--the ProducerRecord > > takes Object and any type errors between the object provided and the > > serializer give occurs at runtime. > > > > Option 1: No plugins > > > > This would mean byte[] key, byte[] value, and partitioning done by client > > by passing in a partition *number* directly. > > > > The problem with this is that it is tricky to compute the partition > > correctly and probably most people won't. We could add a getCluster() > > method to return the Cluster instance you should use for partitioning. > But > > I suspect people would be lazy and not use that and instead hard-code > > partitions which would break if partitions were added or they hard coded > it > > wrong. In my experience 3 partitioning strategies cover like 99% of cases > > so not having a default implementation for this makes the common case > > harder. Left to their own devices people will use bad hash functions and > > get weird results. > > > > Option 1A: Alternatively we could partition by the key using the existing > > default partitioning strategy which only uses the byte[] anyway but > instead > > of having a partitionKey we could have a numerical partition override and > > add the getCluster() method to get the cluster metadata. That would make > > custom partitioning possible but handle the common case simply. > > > > Option 2: Partitioner plugin remains, serializers go. > > > > The problem here is that the partitioner might lose access to the > > deserialized key which would occasionally be useful for semantic > > partitioning schemes. The Partitioner could deserialize the key but that > > would be inefficient and weird. > > > > This problem could be fixed by having key and value be byte[] but > > retaining partitionKey as an Object and passing it to the partitioner as > > is. Then if you have a partitioner which requires the deserialized key > you > > would need to use this partition key. One weird side effect is that if > you > > want to have a custom partition key BUT want to partition by the bytes of > > that key rather than the object value you must write a customer > partitioner > > and serialize it yourself. > > > > Of these I think I prefer 1A but could be convinced of 0 since that is > how > > it works now. > > > > Thoughts? > > > > -Jay > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> > wrote: > > > >> Jay - Thanks for the call for comments. Here's some initial input: > >> > >> - Make message serialization a client responsibility (making all > messages > >> byte[]). Reflection-based loading makes it harder to use generic codecs > >> (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec > programmatically. > >> Non-default partitioning should require an explicit partition key. > >> > >> - I really like the fact that it will be native Java. Please consider > >> using > >> native maven and not sbt, gradle, ivy, etc as they don't reliably play > >> nice > >> in the maven ecosystem. A jar without a well-formed pom doesn't feel > like > >> a > >> real artifact. The pom's generated by sbt et al. are not well formed. > >> Using > >> maven will make builds and IDE integration much smoother. > >> > >> - Look at Nick Telford's dropwizard-extras package in which he defines > >> some > >> Jackson-compatible POJO's for loading configuration. Seems like your > >> client > >> migration is similar. The config objects should have constructors or > >> factories that accept Map<String, String> and Properties for ease of > >> migration. > >> > >> - Would you consider using the org.apache.kafka package for the new API > >> (quibble) > >> > >> - Why create your own futures rather than use > >> java.util.concurrent.Future<Long> or similar? Standard futures will play > >> nice with other reactive libs and things like J8's ComposableFuture. > >> > >> Thanks again, > >> C > >> > >> > >> > >> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com > >> >wrote: > >> > >> > A couple comments: > >> > > >> > 1) Why does the config use a broker list instead of discovering the > >> brokers > >> > in ZooKeeper? It doesn't match the HighLevelConsumer API. > >> > > >> > 2) It looks like broker connections are created on demand. I'm > >> wondering > >> > if sometimes you might want to flush out config or network > connectivity > >> > issues before pushing the first message through. > >> > > >> > Should there also be a KafkaProducer.connect() or .open() method or > >> > connectAll()? I guess it would try to connect to all brokers in the > >> > BROKER_LIST_CONFIG > >> > > >> > HTH, > >> > > >> > Roger > >> > > >> > > >> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> > >> wrote: > >> > > >> > > As mentioned in a previous email we are working on a > >> re-implementation of > >> > > the producer. I would like to use this email thread to discuss the > >> > details > >> > > of the public API and the configuration. I would love for us to be > >> > > incredibly picky about this public api now so it is as good as > >> possible > >> > and > >> > > we don't need to break it in the future. > >> > > > >> > > The best way to get a feel for the API is actually to take a look at > >> the > >> > > javadoc, my hope is to get the api docs good enough so that it is > >> > > self-explanatory: > >> > > > >> > > > >> > > >> > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > >> > > > >> > > Please take a look at this API and give me any thoughts you may > have! > >> > > > >> > > It may also be reasonable to take a look at the configs: > >> > > > >> > > > >> > > >> > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > >> > > > >> > > The actual code is posted here: > >> > > https://issues.apache.org/jira/browse/KAFKA-1227 > >> > > > >> > > A few questions or comments to kick things off: > >> > > 1. We need to make a decision on whether serialization of the user's > >> key > >> > > and value should be done by the user (with our api just taking > >> byte[]) or > >> > > if we should take an object and allow the user to configure a > >> Serializer > >> > > class which we instantiate via reflection. We take the later > approach > >> in > >> > > the current producer, and I have carried this through to this > >> prototype. > >> > > The tradeoff I see is this: taking byte[] is actually simpler, the > >> user > >> > can > >> > > directly do whatever serialization they like. The complication is > >> > actually > >> > > partitioning. Currently partitioning is done by a similar plug-in > api > >> > > (Partitioner) which the user can implement and configure to override > >> how > >> > > partitions are assigned. If we take byte[] as input then we have no > >> > access > >> > > to the original object and partitioning MUST be done on the byte[]. > >> This > >> > is > >> > > fine for hash partitioning. However for various types of semantic > >> > > partitioning (range partitioning, or whatever) you would want access > >> to > >> > the > >> > > original object. In the current approach a producer who wishes to > send > >> > > byte[] they have serialized in their own code can configure the > >> > > BytesSerialization we supply which is just a "no op" serialization. > >> > > 2. We should obsess over naming and make sure each of the class > names > >> are > >> > > good. > >> > > 3. Jun has already pointed out that we need to include the topic and > >> > > partition in the response, which is absolutely right. I haven't done > >> that > >> > > yet but that definitely needs to be there. > >> > > 4. Currently RecordSend.await will throw an exception if the request > >> > > failed. The intention here is that producer.send(message).await() > >> exactly > >> > > simulates a synchronous call. Guozhang has noted that this is a > little > >> > > annoying since the user must then catch exceptions. However if we > >> remove > >> > > this then if the user doesn't check for errors they won't know one > has > >> > > occurred, which I predict will be a common mistake. > >> > > 5. Perhaps there is more we could do to make the async callbacks and > >> > future > >> > > we give back intuitive and easy to program against? > >> > > > >> > > Some background info on implementation: > >> > > > >> > > At a high level the primary difference in this producer is that it > >> > removes > >> > > the distinction between the "sync" and "async" producer. Effectively > >> all > >> > > requests are sent asynchronously but always return a future response > >> > object > >> > > that gives the offset as well as any error that may have occurred > when > >> > the > >> > > request is complete. The batching that is done in the async producer > >> only > >> > > today is done whenever possible now. This means that the sync > >> producer, > >> > > under load, can get performance as good as the async producer > >> > (preliminary > >> > > results show the producer getting 1m messages/sec). This works > >> similar to > >> > > group commit in databases but with respect to the actual network > >> > > transmission--any messages that arrive while a send is in progress > are > >> > > batched together. It is also possible to encourage batching even > under > >> > low > >> > > load to save server resources by introducing a delay on the send to > >> allow > >> > > more messages to accumulate; this is done using the linger.msconfig > >> > (this > >> > > is similar to Nagle's algorithm in TCP). > >> > > > >> > > This producer does all network communication asynchronously and in > >> > parallel > >> > > to all servers so the performance penalty for acks=-1 and waiting on > >> > > replication should be much reduced. I haven't done much benchmarking > >> on > >> > > this yet, though. > >> > > > >> > > The high level design is described a little here, though this is > now a > >> > > little out of date: > >> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > >> > > > >> > > -Jay > >> > > > >> > > >> > > > > >