That's a good point about 1A - does seem that we would need to have some kind of TTL for each topic's metadata.
Also, WRT ZK dependency I don't think that decision (for the Java client) affects other clients. i.e., other client implementations can use whatever discovery mechanism it chooses. That said, I prefer not having a ZK dependency for the same reasons covered earlier in this thread. On Thu, Jan 30, 2014 at 4:34 PM, Jun Rao <jun...@gmail.com> wrote: > With option 1A, if we increase # partitions on a topic, how will the > producer find out newly created partitions? Do we expect the producer to > periodically call getCluster()? > > As for ZK dependency, one of the goals of client rewrite is to reduce > dependencies so that one can implement the client in languages other than > java. ZK client is only available in a small number of languages. > > Thanks, > > Jun > > > On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> Clark and all, >> >> I thought a little bit about the serialization question. Here are the >> options I see and the pros and cons I can think of. I'd love to hear >> people's preferences if you have a strong one. >> >> One important consideration is that however the producer works will also >> need to be how the new consumer works (which we hope to write next). That >> is if you put objects in, you should get objects out. So we need to think >> through both sides. >> >> Options: >> >> Option 0: What is in the existing scala code and the java code I >> posted--Serializer and Partitioner plugin provided by the user via config. >> Partitioner has a sane default, but Serializer needs to be specified in >> config. >> >> Pros: How it works today in the scala code. >> Cons: You have to adapt your serialization library of choice to our >> interfaces. The reflective class loading means typo in the serializer name >> give odd errors. Likewise there is little type safety--the ProducerRecord >> takes Object and any type errors between the object provided and the >> serializer give occurs at runtime. >> >> Option 1: No plugins >> >> This would mean byte[] key, byte[] value, and partitioning done by client >> by passing in a partition *number* directly. >> >> The problem with this is that it is tricky to compute the partition >> correctly and probably most people won't. We could add a getCluster() >> method to return the Cluster instance you should use for partitioning. But >> I suspect people would be lazy and not use that and instead hard-code >> partitions which would break if partitions were added or they hard coded it >> wrong. In my experience 3 partitioning strategies cover like 99% of cases >> so not having a default implementation for this makes the common case >> harder. Left to their own devices people will use bad hash functions and >> get weird results. >> >> Option 1A: Alternatively we could partition by the key using the existing >> default partitioning strategy which only uses the byte[] anyway but instead >> of having a partitionKey we could have a numerical partition override and >> add the getCluster() method to get the cluster metadata. That would make >> custom partitioning possible but handle the common case simply. >> >> Option 2: Partitioner plugin remains, serializers go. >> >> The problem here is that the partitioner might lose access to the >> deserialized key which would occasionally be useful for semantic >> partitioning schemes. The Partitioner could deserialize the key but that >> would be inefficient and weird. >> >> This problem could be fixed by having key and value be byte[] but retaining >> partitionKey as an Object and passing it to the partitioner as is. Then if >> you have a partitioner which requires the deserialized key you would need >> to use this partition key. One weird side effect is that if you want to >> have a custom partition key BUT want to partition by the bytes of that key >> rather than the object value you must write a customer partitioner and >> serialize it yourself. >> >> Of these I think I prefer 1A but could be convinced of 0 since that is how >> it works now. >> >> Thoughts? >> >> -Jay >> >> >> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote: >> >> > Jay - Thanks for the call for comments. Here's some initial input: >> > >> > - Make message serialization a client responsibility (making all messages >> > byte[]). Reflection-based loading makes it harder to use generic codecs >> > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec >> programmatically. >> > Non-default partitioning should require an explicit partition key. >> > >> > - I really like the fact that it will be native Java. Please consider >> using >> > native maven and not sbt, gradle, ivy, etc as they don't reliably play >> nice >> > in the maven ecosystem. A jar without a well-formed pom doesn't feel >> like a >> > real artifact. The pom's generated by sbt et al. are not well formed. >> Using >> > maven will make builds and IDE integration much smoother. >> > >> > - Look at Nick Telford's dropwizard-extras package in which he defines >> some >> > Jackson-compatible POJO's for loading configuration. Seems like your >> client >> > migration is similar. The config objects should have constructors or >> > factories that accept Map<String, String> and Properties for ease of >> > migration. >> > >> > - Would you consider using the org.apache.kafka package for the new API >> > (quibble) >> > >> > - Why create your own futures rather than use >> > java.util.concurrent.Future<Long> or similar? Standard futures will play >> > nice with other reactive libs and things like J8's ComposableFuture. >> > >> > Thanks again, >> > C >> > >> > >> > >> > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com >> > >wrote: >> > >> > > A couple comments: >> > > >> > > 1) Why does the config use a broker list instead of discovering the >> > brokers >> > > in ZooKeeper? It doesn't match the HighLevelConsumer API. >> > > >> > > 2) It looks like broker connections are created on demand. I'm >> wondering >> > > if sometimes you might want to flush out config or network connectivity >> > > issues before pushing the first message through. >> > > >> > > Should there also be a KafkaProducer.connect() or .open() method or >> > > connectAll()? I guess it would try to connect to all brokers in the >> > > BROKER_LIST_CONFIG >> > > >> > > HTH, >> > > >> > > Roger >> > > >> > > >> > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > >> > > > As mentioned in a previous email we are working on a >> re-implementation >> > of >> > > > the producer. I would like to use this email thread to discuss the >> > > details >> > > > of the public API and the configuration. I would love for us to be >> > > > incredibly picky about this public api now so it is as good as >> possible >> > > and >> > > > we don't need to break it in the future. >> > > > >> > > > The best way to get a feel for the API is actually to take a look at >> > the >> > > > javadoc, my hope is to get the api docs good enough so that it is >> > > > self-explanatory: >> > > > >> > > > >> > > >> > >> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html >> > > > >> > > > Please take a look at this API and give me any thoughts you may have! >> > > > >> > > > It may also be reasonable to take a look at the configs: >> > > > >> > > > >> > > >> > >> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html >> > > > >> > > > The actual code is posted here: >> > > > https://issues.apache.org/jira/browse/KAFKA-1227 >> > > > >> > > > A few questions or comments to kick things off: >> > > > 1. We need to make a decision on whether serialization of the user's >> > key >> > > > and value should be done by the user (with our api just taking >> byte[]) >> > or >> > > > if we should take an object and allow the user to configure a >> > Serializer >> > > > class which we instantiate via reflection. We take the later approach >> > in >> > > > the current producer, and I have carried this through to this >> > prototype. >> > > > The tradeoff I see is this: taking byte[] is actually simpler, the >> user >> > > can >> > > > directly do whatever serialization they like. The complication is >> > > actually >> > > > partitioning. Currently partitioning is done by a similar plug-in api >> > > > (Partitioner) which the user can implement and configure to override >> > how >> > > > partitions are assigned. If we take byte[] as input then we have no >> > > access >> > > > to the original object and partitioning MUST be done on the byte[]. >> > This >> > > is >> > > > fine for hash partitioning. However for various types of semantic >> > > > partitioning (range partitioning, or whatever) you would want access >> to >> > > the >> > > > original object. In the current approach a producer who wishes to >> send >> > > > byte[] they have serialized in their own code can configure the >> > > > BytesSerialization we supply which is just a "no op" serialization. >> > > > 2. We should obsess over naming and make sure each of the class names >> > are >> > > > good. >> > > > 3. Jun has already pointed out that we need to include the topic and >> > > > partition in the response, which is absolutely right. I haven't done >> > that >> > > > yet but that definitely needs to be there. >> > > > 4. Currently RecordSend.await will throw an exception if the request >> > > > failed. The intention here is that producer.send(message).await() >> > exactly >> > > > simulates a synchronous call. Guozhang has noted that this is a >> little >> > > > annoying since the user must then catch exceptions. However if we >> > remove >> > > > this then if the user doesn't check for errors they won't know one >> has >> > > > occurred, which I predict will be a common mistake. >> > > > 5. Perhaps there is more we could do to make the async callbacks and >> > > future >> > > > we give back intuitive and easy to program against? >> > > > >> > > > Some background info on implementation: >> > > > >> > > > At a high level the primary difference in this producer is that it >> > > removes >> > > > the distinction between the "sync" and "async" producer. Effectively >> > all >> > > > requests are sent asynchronously but always return a future response >> > > object >> > > > that gives the offset as well as any error that may have occurred >> when >> > > the >> > > > request is complete. The batching that is done in the async producer >> > only >> > > > today is done whenever possible now. This means that the sync >> producer, >> > > > under load, can get performance as good as the async producer >> > > (preliminary >> > > > results show the producer getting 1m messages/sec). This works >> similar >> > to >> > > > group commit in databases but with respect to the actual network >> > > > transmission--any messages that arrive while a send is in progress >> are >> > > > batched together. It is also possible to encourage batching even >> under >> > > low >> > > > load to save server resources by introducing a delay on the send to >> > allow >> > > > more messages to accumulate; this is done using the linger.ms config >> > > (this >> > > > is similar to Nagle's algorithm in TCP). >> > > > >> > > > This producer does all network communication asynchronously and in >> > > parallel >> > > > to all servers so the performance penalty for acks=-1 and waiting on >> > > > replication should be much reduced. I haven't done much benchmarking >> on >> > > > this yet, though. >> > > > >> > > > The high level design is described a little here, though this is now >> a >> > > > little out of date: >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite >> > > > >> > > > -Jay >> > > > >> > > >> > >>