+1 all of Clark's points above.
On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote: > Jay - Thanks for the call for comments. Here's some initial input: > > - Make message serialization a client responsibility (making all messages > byte[]). Reflection-based loading makes it harder to use generic codecs > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec programmatically. > Non-default partitioning should require an explicit partition key. > > - I really like the fact that it will be native Java. Please consider using > native maven and not sbt, gradle, ivy, etc as they don't reliably play nice > in the maven ecosystem. A jar without a well-formed pom doesn't feel like a > real artifact. The pom's generated by sbt et al. are not well formed. Using > maven will make builds and IDE integration much smoother. > > - Look at Nick Telford's dropwizard-extras package in which he defines some > Jackson-compatible POJO's for loading configuration. Seems like your client > migration is similar. The config objects should have constructors or > factories that accept Map<String, String> and Properties for ease of > migration. > > - Would you consider using the org.apache.kafka package for the new API > (quibble) > > - Why create your own futures rather than use > java.util.concurrent.Future<Long> or similar? Standard futures will play > nice with other reactive libs and things like J8's ComposableFuture. > > Thanks again, > C > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com > >wrote: > > > A couple comments: > > > > 1) Why does the config use a broker list instead of discovering the > brokers > > in ZooKeeper? It doesn't match the HighLevelConsumer API. > > > > 2) It looks like broker connections are created on demand. I'm wondering > > if sometimes you might want to flush out config or network connectivity > > issues before pushing the first message through. > > > > Should there also be a KafkaProducer.connect() or .open() method or > > connectAll()? I guess it would try to connect to all brokers in the > > BROKER_LIST_CONFIG > > > > HTH, > > > > Roger > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > As mentioned in a previous email we are working on a re-implementation > of > > > the producer. I would like to use this email thread to discuss the > > details > > > of the public API and the configuration. I would love for us to be > > > incredibly picky about this public api now so it is as good as possible > > and > > > we don't need to break it in the future. > > > > > > The best way to get a feel for the API is actually to take a look at > the > > > javadoc, my hope is to get the api docs good enough so that it is > > > self-explanatory: > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > Please take a look at this API and give me any thoughts you may have! > > > > > > It may also be reasonable to take a look at the configs: > > > > > > > > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > > > > > The actual code is posted here: > > > https://issues.apache.org/jira/browse/KAFKA-1227 > > > > > > A few questions or comments to kick things off: > > > 1. We need to make a decision on whether serialization of the user's > key > > > and value should be done by the user (with our api just taking byte[]) > or > > > if we should take an object and allow the user to configure a > Serializer > > > class which we instantiate via reflection. We take the later approach > in > > > the current producer, and I have carried this through to this > prototype. > > > The tradeoff I see is this: taking byte[] is actually simpler, the user > > can > > > directly do whatever serialization they like. The complication is > > actually > > > partitioning. Currently partitioning is done by a similar plug-in api > > > (Partitioner) which the user can implement and configure to override > how > > > partitions are assigned. If we take byte[] as input then we have no > > access > > > to the original object and partitioning MUST be done on the byte[]. > This > > is > > > fine for hash partitioning. However for various types of semantic > > > partitioning (range partitioning, or whatever) you would want access to > > the > > > original object. In the current approach a producer who wishes to send > > > byte[] they have serialized in their own code can configure the > > > BytesSerialization we supply which is just a "no op" serialization. > > > 2. We should obsess over naming and make sure each of the class names > are > > > good. > > > 3. Jun has already pointed out that we need to include the topic and > > > partition in the response, which is absolutely right. I haven't done > that > > > yet but that definitely needs to be there. > > > 4. Currently RecordSend.await will throw an exception if the request > > > failed. The intention here is that producer.send(message).await() > exactly > > > simulates a synchronous call. Guozhang has noted that this is a little > > > annoying since the user must then catch exceptions. However if we > remove > > > this then if the user doesn't check for errors they won't know one has > > > occurred, which I predict will be a common mistake. > > > 5. Perhaps there is more we could do to make the async callbacks and > > future > > > we give back intuitive and easy to program against? > > > > > > Some background info on implementation: > > > > > > At a high level the primary difference in this producer is that it > > removes > > > the distinction between the "sync" and "async" producer. Effectively > all > > > requests are sent asynchronously but always return a future response > > object > > > that gives the offset as well as any error that may have occurred when > > the > > > request is complete. The batching that is done in the async producer > only > > > today is done whenever possible now. This means that the sync producer, > > > under load, can get performance as good as the async producer > > (preliminary > > > results show the producer getting 1m messages/sec). This works similar > to > > > group commit in databases but with respect to the actual network > > > transmission--any messages that arrive while a send is in progress are > > > batched together. It is also possible to encourage batching even under > > low > > > load to save server resources by introducing a delay on the send to > allow > > > more messages to accumulate; this is done using the linger.ms config > > (this > > > is similar to Nagle's algorithm in TCP). > > > > > > This producer does all network communication asynchronously and in > > parallel > > > to all servers so the performance penalty for acks=-1 and waiting on > > > replication should be much reduced. I haven't done much benchmarking on > > > this yet, though. > > > > > > The high level design is described a little here, though this is now a > > > little out of date: > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > > > > -Jay > > > > > >