Hi Jay, - Just to add some more info/confusion about possibly using Future ... If Kafka uses a JDK future, it plays nicely with other frameworks as well. Google Guava has a ListenableFuture that allows callback handling to be added via the returned future, and allows the callbacks to be passed off to a specified executor.
http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html The JDK future can easily be converted to a listenable future. - On the question of byte[] vs Object, could this be solved by layering the API? eg. a raw producer (use byte[] and specify the partition number) and a normal producer (use generic object and specify a Partitioner)? - I am confused by the keys in ProducerRecord and Partitioner. What is the usage for both a key and a partition key? (I am not yet using 0.8) Thanks, Ross On 28 January 2014 05:00, Xavier Stevens <xav...@gaikai.com> wrote: > AutoCloseable would be nice for us as most of our code is using Java 7 at > this point. > > I like Dropwizard's configuration mapping to POJOs via Jackson, but if you > wanted to stick with property maps I don't care enough to object. > > If the producer only dealt with bytes, is there a way we could still due > partition plugins without specifying the number explicitly? I would prefer > to be able to pass in field(s) that would be used by the partitioner. > Obviously if this wasn't possible you could always deserialize the object > in the partitioner and grab the fields you want, but that seems really > expensive to do on every message. > > It would also be nice to have a Java API Encoder constructor taking in > VerifiableProperties. Scala understands how to handle "props: > VerifiableProperties = null", but Java doesn't. So you don't run into this > problem until runtime. > > > -Xavier > > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman <cl...@breyman.com> wrote: > > > Jay - > > > > Config - your explanation makes sense. I'm just so accustomed to having > > Jackson automatically map my configuration objects to POJOs that I've > > stopped using property files. They are lingua franca. The only thought > > might be to separate the config interface from the implementation to > allow > > for alternatives, but that might undermine your point of "do it this way > so > > that everyone can find it where they expect it". > > > > Serialization: Of the options, I like 1A the best, though possibly with > > either an option to specify a partition key rather than ID or a helper to > > translate an arbitrary byte[] or long into a partition number. > > > > Thanks > > Clark > > > > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Thanks for the detailed thoughts. Let me elaborate on the config thing. > > > > > > I agree that at first glance key-value strings don't seem like a very > > good > > > configuration api for a client. Surely a well-typed config class would > be > > > better! I actually disagree and let me see if I can convince you. > > > > > > My reasoning has nothing to do with the api and everything to do with > > > operations. > > > > > > Clients are embedded in applications which are themselves configured. > In > > > any place that takes operations seriously the configuration for these > > > applications will be version controlled and maintained through some > kind > > of > > > config management system. If we give a config class with getters and > > > setters the application has to expose those properties to its > > > configuration. What invariably happens is that the application exposes > > only > > > a choice few properties that they thought they would change. > Furthermore > > > the application will make up a name for these configs that seems > > intuitive > > > at the time in the 2 seconds the engineer spends thinking about it. > > > > > > Now consider the result of this in the large. You end up with dozens or > > > hundreds of applications that have the client embedded. Each exposes a > > > different, inadequate subset of the possible configs, each with > different > > > names. It is a nightmare. > > > > > > If you use a string-string map the config system can directly get a > > bundle > > > of config key-value pairs and put them into the client. This means that > > all > > > configuration is automatically available with the name documented on > the > > > website in every application that does this. If you upgrade to a new > > kafka > > > version with more configs those will be exposed too. If you realize > that > > > you need to change a default you can just go through your configs and > > > change it everywhere as it will have the same name everywhere. > > > > > > -Jay > > > > > > > > > > > > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman <cl...@breyman.com> > > wrote: > > > > > > > Thanks Jay. I'll see if I can put together a more complete response, > > > > perhaps as separate threads so that topics don't get entangled. In > the > > > mean > > > > time, here's a couple responses: > > > > > > > > Serialization: you've broken out a sub-thread so i'll reply there. My > > > bias > > > > is that I like generics (except for type-erasure) and in particular > > they > > > > make it easy to compose serializers for compound payloads (e.g. when > a > > > > common header wraps a payload of parameterized type). I'll respond to > > > your > > > > 4-options message with an example. > > > > > > > > Build: I've seen a lot of "maven-compatible" build systems produce > > > > "artifacts" that aren't really artifacts - no embedded POM or, worst, > > > > malformed POM. I know the sbt-generated artifacts were this way - > onus > > is > > > > on me to see what gradle is spitting out and what a maven build might > > > look > > > > like. Maven may be old and boring, but it gets out of the way and > > > > integrates really seamlessly with a lot of IDEs. When some scala > > > projects I > > > > was working on in the fall of 2011 switched from sbt to maven, build > > > became > > > > a non-issue. > > > > > > > > Config: Not a big deal and no, I don't think a dropwizard dependency > > is > > > > appropriate. I do like using simple entity beans (POJO's not j2EE) > for > > > > configuration, especially if they can be marshalled without > annotation > > by > > > > Jackson. I only mentioned the dropwizard-extras because it has some > > > entity > > > > bean versions of the ZK and Kafka configs. > > > > > > > > Domain-packaging: Also not a big deal - it's what's expected and it's > > > > pretty free in most IDE's. The advantages I see is that it is clear > > > whether > > > > something is from the Apache Kafka project and whether something is > > from > > > > another org and related to Kafka. That said, nothing really enforces > > it. > > > > > > > > Futures: I'll see if I can create some examples to demonstrate Future > > > > making interop easier. > > > > > > > > Regards, > > > > C > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Hey Clark, > > > > > > > > > > - Serialization: Yes I agree with these though I don't consider the > > > loss > > > > of > > > > > generics a big issue. I'll try to summarize what I would consider > the > > > > best > > > > > alternative api with raw byte[]. > > > > > > > > > > - Maven: We had this debate a few months back and the consensus was > > > > gradle. > > > > > Is there a specific issue with the poms gradle makes? I am > extremely > > > > loath > > > > > to revisit the issue as build issues are a recurring thing and no > one > > > > ever > > > > > agrees and ultimately our build needs are very simple. > > > > > > > > > > - Config: I'm not sure if I follow the point. Are you saying we > > should > > > > use > > > > > something in dropwizard for config? One principle here is to try to > > > > remove > > > > > as many client dependencies as possible as we inevitably run into > > > > terrible > > > > > compatibility issues with users who use the same library or its > > > > > dependencies at different versions. Or are you talking about > > > maintaining > > > > > compatibility with existing config parameters? I think as much as a > > > > config > > > > > in the existing client makes sense it should have the same name (I > > was > > > a > > > > > bit sloppy about that so I'll fix any errors there). There are a > few > > > new > > > > > things and we should give those reasonable defaults. I think config > > is > > > > > important so I'll start a thread on the config package in there. > > > > > > > > > > - org.apache.kafka: We could do this. I always considered it kind > of > > an > > > > odd > > > > > thing Java programmers do that has no real motivation (but I could > be > > > > > re-educated!). I don't think it ends up reducing naming conflicts > in > > > > > practice and it adds a lot of noise and nested directories. Is > there > > a > > > > > reason you prefer this or just to be more standard? > > > > > > > > > > - Future: Basically I didn't see any particular advantage. The > > cancel() > > > > > method doesn't really make sense so probably wouldn't work. > Likewise > > I > > > > > dislike the checked exceptions it requires. Basically I just wrote > > out > > > > some > > > > > code examples and it seemed cleaner with a special purpose object. > I > > > > wasn't > > > > > actually aware of plans for improved futures in java 8 or the other > > > > > integrations. Maybe you could elaborate on this a bit and show how > it > > > > would > > > > > be used? Sounds promising, I just don't know a lot about it. > > > > > > > > > > -Jay > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> > > > > wrote: > > > > > > > > > > > Jay - Thanks for the call for comments. Here's some initial > input: > > > > > > > > > > > > - Make message serialization a client responsibility (making all > > > > messages > > > > > > byte[]). Reflection-based loading makes it harder to use generic > > > codecs > > > > > > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build up codec > > > > > programmatically. > > > > > > Non-default partitioning should require an explicit partition > key. > > > > > > > > > > > > - I really like the fact that it will be native Java. Please > > consider > > > > > using > > > > > > native maven and not sbt, gradle, ivy, etc as they don't reliably > > > play > > > > > nice > > > > > > in the maven ecosystem. A jar without a well-formed pom doesn't > > feel > > > > > like a > > > > > > real artifact. The pom's generated by sbt et al. are not well > > formed. > > > > > Using > > > > > > maven will make builds and IDE integration much smoother. > > > > > > > > > > > > - Look at Nick Telford's dropwizard-extras package in which he > > > defines > > > > > some > > > > > > Jackson-compatible POJO's for loading configuration. Seems like > > your > > > > > client > > > > > > migration is similar. The config objects should have constructors > > or > > > > > > factories that accept Map<String, String> and Properties for ease > > of > > > > > > migration. > > > > > > > > > > > > - Would you consider using the org.apache.kafka package for the > new > > > API > > > > > > (quibble) > > > > > > > > > > > > - Why create your own futures rather than use > > > > > > java.util.concurrent.Future<Long> or similar? Standard futures > will > > > > play > > > > > > nice with other reactive libs and things like J8's > > ComposableFuture. > > > > > > > > > > > > Thanks again, > > > > > > C > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover < > > > roger.hoo...@gmail.com > > > > > > >wrote: > > > > > > > > > > > > > A couple comments: > > > > > > > > > > > > > > 1) Why does the config use a broker list instead of discovering > > the > > > > > > brokers > > > > > > > in ZooKeeper? It doesn't match the HighLevelConsumer API. > > > > > > > > > > > > > > 2) It looks like broker connections are created on demand. I'm > > > > > wondering > > > > > > > if sometimes you might want to flush out config or network > > > > connectivity > > > > > > > issues before pushing the first message through. > > > > > > > > > > > > > > Should there also be a KafkaProducer.connect() or .open() > method > > or > > > > > > > connectAll()? I guess it would try to connect to all brokers > in > > > the > > > > > > > BROKER_LIST_CONFIG > > > > > > > > > > > > > > HTH, > > > > > > > > > > > > > > Roger > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps < > jay.kr...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > As mentioned in a previous email we are working on a > > > > > re-implementation > > > > > > of > > > > > > > > the producer. I would like to use this email thread to > discuss > > > the > > > > > > > details > > > > > > > > of the public API and the configuration. I would love for us > to > > > be > > > > > > > > incredibly picky about this public api now so it is as good > as > > > > > possible > > > > > > > and > > > > > > > > we don't need to break it in the future. > > > > > > > > > > > > > > > > The best way to get a feel for the API is actually to take a > > look > > > > at > > > > > > the > > > > > > > > javadoc, my hope is to get the api docs good enough so that > it > > is > > > > > > > > self-explanatory: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > > > > > > > > > > > Please take a look at this API and give me any thoughts you > may > > > > have! > > > > > > > > > > > > > > > > It may also be reasonable to take a look at the configs: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > > > > > > > > > > > > > > > The actual code is posted here: > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1227 > > > > > > > > > > > > > > > > A few questions or comments to kick things off: > > > > > > > > 1. We need to make a decision on whether serialization of the > > > > user's > > > > > > key > > > > > > > > and value should be done by the user (with our api just > taking > > > > > byte[]) > > > > > > or > > > > > > > > if we should take an object and allow the user to configure a > > > > > > Serializer > > > > > > > > class which we instantiate via reflection. We take the later > > > > approach > > > > > > in > > > > > > > > the current producer, and I have carried this through to this > > > > > > prototype. > > > > > > > > The tradeoff I see is this: taking byte[] is actually > simpler, > > > the > > > > > user > > > > > > > can > > > > > > > > directly do whatever serialization they like. The > complication > > is > > > > > > > actually > > > > > > > > partitioning. Currently partitioning is done by a similar > > plug-in > > > > api > > > > > > > > (Partitioner) which the user can implement and configure to > > > > override > > > > > > how > > > > > > > > partitions are assigned. If we take byte[] as input then we > > have > > > no > > > > > > > access > > > > > > > > to the original object and partitioning MUST be done on the > > > byte[]. > > > > > > This > > > > > > > is > > > > > > > > fine for hash partitioning. However for various types of > > semantic > > > > > > > > partitioning (range partitioning, or whatever) you would want > > > > access > > > > > to > > > > > > > the > > > > > > > > original object. In the current approach a producer who > wishes > > to > > > > > send > > > > > > > > byte[] they have serialized in their own code can configure > the > > > > > > > > BytesSerialization we supply which is just a "no op" > > > serialization. > > > > > > > > 2. We should obsess over naming and make sure each of the > class > > > > names > > > > > > are > > > > > > > > good. > > > > > > > > 3. Jun has already pointed out that we need to include the > > topic > > > > and > > > > > > > > partition in the response, which is absolutely right. I > haven't > > > > done > > > > > > that > > > > > > > > yet but that definitely needs to be there. > > > > > > > > 4. Currently RecordSend.await will throw an exception if the > > > > request > > > > > > > > failed. The intention here is that > > producer.send(message).await() > > > > > > exactly > > > > > > > > simulates a synchronous call. Guozhang has noted that this > is a > > > > > little > > > > > > > > annoying since the user must then catch exceptions. However > if > > we > > > > > > remove > > > > > > > > this then if the user doesn't check for errors they won't > know > > > one > > > > > has > > > > > > > > occurred, which I predict will be a common mistake. > > > > > > > > 5. Perhaps there is more we could do to make the async > > callbacks > > > > and > > > > > > > future > > > > > > > > we give back intuitive and easy to program against? > > > > > > > > > > > > > > > > Some background info on implementation: > > > > > > > > > > > > > > > > At a high level the primary difference in this producer is > that > > > it > > > > > > > removes > > > > > > > > the distinction between the "sync" and "async" producer. > > > > Effectively > > > > > > all > > > > > > > > requests are sent asynchronously but always return a future > > > > response > > > > > > > object > > > > > > > > that gives the offset as well as any error that may have > > occurred > > > > > when > > > > > > > the > > > > > > > > request is complete. The batching that is done in the async > > > > producer > > > > > > only > > > > > > > > today is done whenever possible now. This means that the sync > > > > > producer, > > > > > > > > under load, can get performance as good as the async producer > > > > > > > (preliminary > > > > > > > > results show the producer getting 1m messages/sec). This > works > > > > > similar > > > > > > to > > > > > > > > group commit in databases but with respect to the actual > > network > > > > > > > > transmission--any messages that arrive while a send is in > > > progress > > > > > are > > > > > > > > batched together. It is also possible to encourage batching > > even > > > > > under > > > > > > > low > > > > > > > > load to save server resources by introducing a delay on the > > send > > > to > > > > > > allow > > > > > > > > more messages to accumulate; this is done using the > > > linger.msconfig > > > > > > > (this > > > > > > > > is similar to Nagle's algorithm in TCP). > > > > > > > > > > > > > > > > This producer does all network communication asynchronously > and > > > in > > > > > > > parallel > > > > > > > > to all servers so the performance penalty for acks=-1 and > > waiting > > > > on > > > > > > > > replication should be much reduced. I haven't done much > > > > benchmarking > > > > > on > > > > > > > > this yet, though. > > > > > > > > > > > > > > > > The high level design is described a little here, though this > > is > > > > now > > > > > a > > > > > > > > little out of date: > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >