The challenge of directly exposing ProduceRequestResult is that the offset provided is just the base offset and there is no way to know for a particular message where it was in relation to that base offset because the batching is transparent and non-deterministic. So I think we do need some kind of per-message result.
I started with Future<RequestResult>, I think for the same reason you prefer it but then when I actually looked at some code samples it wasn't too great--checked exceptions, methods that we can't easily implement, etc. I moved away from that for two reasons: 1. When I actually wrote out some code samples of usage they were a little ugly for the reasons I described--checked exceptions, methods we can't implement, no helper methods, etc. 2. I originally intended to make the result send work like a ListenableFuture so that you would register the callback on the result rather than as part of the call. I moved away from this primarily because the implementation complexity was a little higher. Whether or not the code prettiness on its own outweighs the familiarity of a normal Future I don't know, but that was the evolution of my thinking. -Jay On Wed, Jan 29, 2014 at 10:06 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey Neha, > > Error handling in RecordSend works as in Future you will get the exception > if there is one from any of the accessor methods or await(). > > The purpose of hasError was that you can write things slightly more simply > (which some people expressed preference for): > if(send.hasError()) > // do something > long offset = send.offset(); > > Instead of the more the slightly longer: > try { > long offset = send.offset(); > } catch (KafkaException e) { > // do something > } > > > On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede > <neha.narkh...@gmail.com>wrote: > >> Regarding the use of Futures - >> >> Agree that there are some downsides to using Futures but both approaches >> have some tradeoffs. >> >> - Standardization and usability >> Future is a widely used and understood Java API and given that the >> functionality that RecordSend hopes to provide is essentially that of >> Future, I think it makes sense to expose a widely understood public API >> for >> our clients. RecordSend, on the other hand, seems to provide some APIs >> that >> are very similar to that of Future, in addition to exposing a bunch of >> APIs >> that belong to ProduceRequestResult. As a user, I would've really >> preferred >> to deal with ProduceRequestResult directly - >> Future<ProduceRequestResult> send(...) >> >> - Error handling >> RecordSend's error handling is quite unintuitive where the user has to >> remember to invoke hasError and error, instead of just throwing the >> exception. Now there are >> some downsides regarding error handling with the Future as well, where the >> user has to catch InterruptedException when we would never run into it. >> However, it seems like a price worth paying for supporting a standard API >> and error handling >> >> - Unused APIs >> This is a downside of using Future, where the cancel() operation would >> always return false and mean nothing. But we can mention that caveat in >> our >> Java docs. >> >> To summarize, I would prefer to expose a well understood and widely >> adopted >> Java API and put up with the overhead of catching one unnecessary checked >> exception, rather than wrap the useful ProduceRequestResult in a custom >> async object (RecordSend) and explain that to our many users. >> >> Thanks, >> Neha >> >> >> >> >> On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> > Hey Neha, >> > >> > Can you elaborate on why you prefer using Java's Future? The downside >> in my >> > mind is the use of the checked InterruptedException and >> ExecutionException. >> > ExecutionException is arguable, but forcing you to catch >> > InterruptedException, often in code that can't be interrupted, seems >> > perverse. It also leaves us with the cancel() method which I don't >> think we >> > really can implement. >> > >> > Option 1A, to recap/elaborate, was the following. There is no >> Serializer or >> > Partitioner api. We take a byte[] key and value and an optional integer >> > partition. If you specify the integer partition it will be used. If you >> do >> > not specify a key or a partition the partition will be chosen in a round >> > robin fashion. If you specify a key but no partition we will chose a >> > partition based on a hash of the key. In order to let the user find the >> > partition we will need to given them access to the Cluster instance >> > directly from the producer. >> > >> > -Jay >> > >> > >> > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <neha.narkh...@gmail.com >> > >wrote: >> > >> > > Here are more thoughts on the public APIs - >> > > >> > > - I suggest we use java's Future instead of custom Future especially >> > since >> > > it is part of the public API >> > > >> > > - Serialization: I like the simplicity of the producer APIs with the >> > > absence of serialization where we just deal with byte arrays for keys >> and >> > > values. What I don't like about this is the performance overhead on >> the >> > > Partitioner for any kind of custom partitioning based on the >> > partitionKey. >> > > Since the only purpose of partitionKey is to do custom partitioning, >> why >> > > can't we take it in directly as an integer and let the user figure out >> > the >> > > mapping from partition_key -> partition_id using the getCluster() API? >> > If I >> > > understand correctly, this is similar to what you suggested as part of >> > > option 1A. I like this approach since it maintains the simplicity of >> APIs >> > > by allowing us to deal with bytes and does not compromise performance >> in >> > > the custom partitioning case. >> > > >> > > Thanks, >> > > Neha >> > > >> > > >> > > >> > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > >> > > > Hey Tom, >> > > > >> > > > That sounds cool. How did you end up handling parallel I/O if you >> wrap >> > > the >> > > > individual connections? Don't you need some selector that selects >> over >> > > all >> > > > the connections? >> > > > >> > > > -Jay >> > > > >> > > > >> > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown <tombrow...@gmail.com> >> > wrote: >> > > > >> > > > > I implemented a 0.7 client in pure java, and its API very closely >> > > > resembled >> > > > > this. (When multiple people independently engineer the same >> solution, >> > > > it's >> > > > > probably good... right?). However, there were a few architectural >> > > > > differences with my client: >> > > > > >> > > > > 1. The basic client itself was just an asynchronous layer around >> the >> > > > > different server functions. In and of itself it had no knowledge >> of >> > > > > partitions, only servers (and maintained TCP connections to them). >> > > > > >> > > > > 2. The main producer was an additional layer that provided a >> > high-level >> > > > > interface that could batch individual messages based on partition. >> > > > > >> > > > > 3. Knowledge of partitioning was done via an interface so that >> > > different >> > > > > strategies could be used. >> > > > > >> > > > > 4. Partitioning was done by the user, with knowledge of the >> available >> > > > > partitions provided by #3. >> > > > > >> > > > > 5. Serialization was done by the user to simplify the API. >> > > > > >> > > > > 6. Futures were used to make asynchronous emulate synchronous >> calls. >> > > > > >> > > > > >> > > > > The main benefit of this approach is flexibility. For example, >> since >> > > the >> > > > > base client was just a managed connection (and not inherently a >> > > > producer), >> > > > > it was easy to composite a produce request and an offsets request >> > > > together >> > > > > into a confirmed produce request (officially not available in >> 0.7). >> > > > > >> > > > > Decoupling the basic client from partition management allowed the >> me >> > to >> > > > > implement zk discovery as a separate project so that the main >> project >> > > had >> > > > > no complex dependencies. The same was true of decoupling >> > serialization. >> > > > > It's trivial to build an optional layer that adds those features >> in, >> > > > while >> > > > > allowing access to the base APIs for those that need it. >> > > > > >> > > > > Using standard Future objects was also beneficial, since I could >> > > combine >> > > > > them with existing tools (such as guava). >> > > > > >> > > > > It may be too late to be of use, but I have been working with my >> > > > company's >> > > > > legal department to release the implementation I described above. >> If >> > > > you're >> > > > > interested in it, let me know. >> > > > > >> > > > > >> > > > > To sum up my thoughts regarding the new API, I think it's a great >> > > start. >> > > > I >> > > > > would like to see a more layered approach so I can use the parts I >> > > want, >> > > > > and adapt the other parts as needed. I would also like to see >> > standard >> > > > > interfaces (especially Future) used where they makes sense. >> > > > > >> > > > > --Tom >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Tue, Jan 28, 2014 at 1:33 PM, Roger Hoover < >> > roger.hoo...@gmail.com >> > > > > >wrote: >> > > > > >> > > > > > +1 ListenableFuture: If this works similar to Deferreds in >> Twisted >> > > > Python >> > > > > > or Promised IO in Javascript, I think this is a great pattern >> for >> > > > > > decoupling your callback logic from the place where the Future >> is >> > > > > > generated. You can register as many callbacks as you like, >> each in >> > > the >> > > > > > appropriate layer of the code and have each observer get >> notified >> > > when >> > > > > the >> > > > > > promised i/o is complete without any of them knowing about each >> > > other. >> > > > > > >> > > > > > >> > > > > > On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps < >> jay.kr...@gmail.com> >> > > > wrote: >> > > > > > >> > > > > > > Hey Ross, >> > > > > > > >> > > > > > > - ListenableFuture: Interesting. That would be an alternative >> to >> > > the >> > > > > > direct >> > > > > > > callback support we provide. There could be pros to this, let >> me >> > > > think >> > > > > > > about it. >> > > > > > > - We could provide layering, but I feel that the >> serialization is >> > > > such >> > > > > a >> > > > > > > small thing we should just make a decision and chose one, it >> > > doesn't >> > > > > seem >> > > > > > > to me to justify a whole public facing layer. >> > > > > > > - Yes, this is fairly esoteric, essentially I think it is >> fairly >> > > > > similar >> > > > > > to >> > > > > > > databases like DynamoDB that allow you to specify two >> partition >> > > keys >> > > > (I >> > > > > > > think DynamoDB does this...). The reasoning is that in fact >> there >> > > are >> > > > > > > several things you can use the key field for: (1) to compute >> the >> > > > > > partition >> > > > > > > to store the data in, (2) as a unique identifier to >> deduplicate >> > > that >> > > > > > > partition's records within a log. These two things are almost >> > > always >> > > > > the >> > > > > > > same, but occationally may differ when you want to group data >> in >> > a >> > > > more >> > > > > > > sophisticated way then just a hash of the primary key but >> still >> > > > retain >> > > > > > the >> > > > > > > proper primary key for delivery to the consumer and log >> > compaction. >> > > > > > > >> > > > > > > -Jay >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Jan 28, 2014 at 3:24 AM, Ross Black < >> > > ross.w.bl...@gmail.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Hi Jay, >> > > > > > > > >> > > > > > > > - Just to add some more info/confusion about possibly using >> > > Future >> > > > > ... >> > > > > > > > If Kafka uses a JDK future, it plays nicely with other >> > > frameworks >> > > > > as >> > > > > > > > well. >> > > > > > > > Google Guava has a ListenableFuture that allows callback >> > > handling >> > > > > to >> > > > > > be >> > > > > > > > added via the returned future, and allows the callbacks to >> be >> > > > passed >> > > > > > off >> > > > > > > to >> > > > > > > > a specified executor. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html >> > > > > > > > The JDK future can easily be converted to a listenable >> > future. >> > > > > > > > >> > > > > > > > - On the question of byte[] vs Object, could this be solved >> by >> > > > > layering >> > > > > > > the >> > > > > > > > API? eg. a raw producer (use byte[] and specify the >> partition >> > > > > number) >> > > > > > > and >> > > > > > > > a normal producer (use generic object and specify a >> > Partitioner)? >> > > > > > > > >> > > > > > > > - I am confused by the keys in ProducerRecord and >> Partitioner. >> > > > What >> > > > > is >> > > > > > > the >> > > > > > > > usage for both a key and a partition key? (I am not yet >> using >> > > 0.8) >> > > > > > > > >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Ross >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On 28 January 2014 05:00, Xavier Stevens <xav...@gaikai.com >> > >> > > > wrote: >> > > > > > > > >> > > > > > > > > AutoCloseable would be nice for us as most of our code is >> > using >> > > > > Java >> > > > > > 7 >> > > > > > > at >> > > > > > > > > this point. >> > > > > > > > > >> > > > > > > > > I like Dropwizard's configuration mapping to POJOs via >> > Jackson, >> > > > but >> > > > > > if >> > > > > > > > you >> > > > > > > > > wanted to stick with property maps I don't care enough to >> > > object. >> > > > > > > > > >> > > > > > > > > If the producer only dealt with bytes, is there a way we >> > could >> > > > > still >> > > > > > > due >> > > > > > > > > partition plugins without specifying the number >> explicitly? I >> > > > would >> > > > > > > > prefer >> > > > > > > > > to be able to pass in field(s) that would be used by the >> > > > > partitioner. >> > > > > > > > > Obviously if this wasn't possible you could always >> > deserialize >> > > > the >> > > > > > > object >> > > > > > > > > in the partitioner and grab the fields you want, but that >> > seems >> > > > > > really >> > > > > > > > > expensive to do on every message. >> > > > > > > > > >> > > > > > > > > It would also be nice to have a Java API Encoder >> constructor >> > > > taking >> > > > > > in >> > > > > > > > > VerifiableProperties. Scala understands how to handle >> "props: >> > > > > > > > > VerifiableProperties = null", but Java doesn't. So you >> don't >> > > run >> > > > > into >> > > > > > > > this >> > > > > > > > > problem until runtime. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -Xavier >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman < >> > > > cl...@breyman.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Jay - >> > > > > > > > > > >> > > > > > > > > > Config - your explanation makes sense. I'm just so >> > accustomed >> > > > to >> > > > > > > having >> > > > > > > > > > Jackson automatically map my configuration objects to >> POJOs >> > > > that >> > > > > > I've >> > > > > > > > > > stopped using property files. They are lingua franca. >> The >> > > only >> > > > > > > thought >> > > > > > > > > > might be to separate the config interface from the >> > > > implementation >> > > > > > to >> > > > > > > > > allow >> > > > > > > > > > for alternatives, but that might undermine your point of >> > "do >> > > it >> > > > > > this >> > > > > > > > way >> > > > > > > > > so >> > > > > > > > > > that everyone can find it where they expect it". >> > > > > > > > > > >> > > > > > > > > > Serialization: Of the options, I like 1A the best, >> though >> > > > > possibly >> > > > > > > with >> > > > > > > > > > either an option to specify a partition key rather than >> ID >> > > or a >> > > > > > > helper >> > > > > > > > to >> > > > > > > > > > translate an arbitrary byte[] or long into a partition >> > > number. >> > > > > > > > > > >> > > > > > > > > > Thanks >> > > > > > > > > > Clark >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps < >> > > > jay.kr...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Thanks for the detailed thoughts. Let me elaborate on >> the >> > > > > config >> > > > > > > > thing. >> > > > > > > > > > > >> > > > > > > > > > > I agree that at first glance key-value strings don't >> seem >> > > > like >> > > > > a >> > > > > > > very >> > > > > > > > > > good >> > > > > > > > > > > configuration api for a client. Surely a well-typed >> > config >> > > > > class >> > > > > > > > would >> > > > > > > > > be >> > > > > > > > > > > better! I actually disagree and let me see if I can >> > > convince >> > > > > you. >> > > > > > > > > > > >> > > > > > > > > > > My reasoning has nothing to do with the api and >> > everything >> > > to >> > > > > do >> > > > > > > with >> > > > > > > > > > > operations. >> > > > > > > > > > > >> > > > > > > > > > > Clients are embedded in applications which are >> themselves >> > > > > > > configured. >> > > > > > > > > In >> > > > > > > > > > > any place that takes operations seriously the >> > configuration >> > > > for >> > > > > > > these >> > > > > > > > > > > applications will be version controlled and maintained >> > > > through >> > > > > > some >> > > > > > > > > kind >> > > > > > > > > > of >> > > > > > > > > > > config management system. If we give a config class >> with >> > > > > getters >> > > > > > > and >> > > > > > > > > > > setters the application has to expose those >> properties to >> > > its >> > > > > > > > > > > configuration. What invariably happens is that the >> > > > application >> > > > > > > > exposes >> > > > > > > > > > only >> > > > > > > > > > > a choice few properties that they thought they would >> > > change. >> > > > > > > > > Furthermore >> > > > > > > > > > > the application will make up a name for these configs >> > that >> > > > > seems >> > > > > > > > > > intuitive >> > > > > > > > > > > at the time in the 2 seconds the engineer spends >> thinking >> > > > about >> > > > > > it. >> > > > > > > > > > > >> > > > > > > > > > > Now consider the result of this in the large. You end >> up >> > > with >> > > > > > > dozens >> > > > > > > > or >> > > > > > > > > > > hundreds of applications that have the client >> embedded. >> > > Each >> > > > > > > exposes >> > > > > > > > a >> > > > > > > > > > > different, inadequate subset of the possible configs, >> > each >> > > > with >> > > > > > > > > different >> > > > > > > > > > > names. It is a nightmare. >> > > > > > > > > > > >> > > > > > > > > > > If you use a string-string map the config system can >> > > directly >> > > > > > get a >> > > > > > > > > > bundle >> > > > > > > > > > > of config key-value pairs and put them into the >> client. >> > > This >> > > > > > means >> > > > > > > > that >> > > > > > > > > > all >> > > > > > > > > > > configuration is automatically available with the name >> > > > > documented >> > > > > > > on >> > > > > > > > > the >> > > > > > > > > > > website in every application that does this. If you >> > upgrade >> > > > to >> > > > > a >> > > > > > > new >> > > > > > > > > > kafka >> > > > > > > > > > > version with more configs those will be exposed too. >> If >> > you >> > > > > > realize >> > > > > > > > > that >> > > > > > > > > > > you need to change a default you can just go through >> your >> > > > > configs >> > > > > > > and >> > > > > > > > > > > change it everywhere as it will have the same name >> > > > everywhere. >> > > > > > > > > > > >> > > > > > > > > > > -Jay >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman < >> > > > > > cl...@breyman.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Thanks Jay. I'll see if I can put together a more >> > > complete >> > > > > > > > response, >> > > > > > > > > > > > perhaps as separate threads so that topics don't get >> > > > > entangled. >> > > > > > > In >> > > > > > > > > the >> > > > > > > > > > > mean >> > > > > > > > > > > > time, here's a couple responses: >> > > > > > > > > > > > >> > > > > > > > > > > > Serialization: you've broken out a sub-thread so >> i'll >> > > reply >> > > > > > > there. >> > > > > > > > My >> > > > > > > > > > > bias >> > > > > > > > > > > > is that I like generics (except for type-erasure) >> and >> > in >> > > > > > > particular >> > > > > > > > > > they >> > > > > > > > > > > > make it easy to compose serializers for compound >> > payloads >> > > > > (e.g. >> > > > > > > > when >> > > > > > > > > a >> > > > > > > > > > > > common header wraps a payload of parameterized >> type). >> > > I'll >> > > > > > > respond >> > > > > > > > to >> > > > > > > > > > > your >> > > > > > > > > > > > 4-options message with an example. >> > > > > > > > > > > > >> > > > > > > > > > > > Build: I've seen a lot of "maven-compatible" build >> > > systems >> > > > > > > produce >> > > > > > > > > > > > "artifacts" that aren't really artifacts - no >> embedded >> > > POM >> > > > > or, >> > > > > > > > worst, >> > > > > > > > > > > > malformed POM. I know the sbt-generated artifacts >> were >> > > this >> > > > > > way - >> > > > > > > > > onus >> > > > > > > > > > is >> > > > > > > > > > > > on me to see what gradle is spitting out and what a >> > maven >> > > > > build >> > > > > > > > might >> > > > > > > > > > > look >> > > > > > > > > > > > like. Maven may be old and boring, but it gets out >> of >> > the >> > > > way >> > > > > > and >> > > > > > > > > > > > integrates really seamlessly with a lot of IDEs. >> When >> > > some >> > > > > > scala >> > > > > > > > > > > projects I >> > > > > > > > > > > > was working on in the fall of 2011 switched from >> sbt to >> > > > > maven, >> > > > > > > > build >> > > > > > > > > > > became >> > > > > > > > > > > > a non-issue. >> > > > > > > > > > > > >> > > > > > > > > > > > Config: Not a big deal and no, I don't think a >> > > dropwizard >> > > > > > > > dependency >> > > > > > > > > > is >> > > > > > > > > > > > appropriate. I do like using simple entity beans >> > (POJO's >> > > > not >> > > > > > > j2EE) >> > > > > > > > > for >> > > > > > > > > > > > configuration, especially if they can be marshalled >> > > without >> > > > > > > > > annotation >> > > > > > > > > > by >> > > > > > > > > > > > Jackson. I only mentioned the dropwizard-extras >> > because >> > > it >> > > > > has >> > > > > > > > some >> > > > > > > > > > > entity >> > > > > > > > > > > > bean versions of the ZK and Kafka configs. >> > > > > > > > > > > > >> > > > > > > > > > > > Domain-packaging: Also not a big deal - it's what's >> > > > expected >> > > > > > and >> > > > > > > > it's >> > > > > > > > > > > > pretty free in most IDE's. The advantages I see is >> that >> > > it >> > > > is >> > > > > > > clear >> > > > > > > > > > > whether >> > > > > > > > > > > > something is from the Apache Kafka project and >> whether >> > > > > > something >> > > > > > > is >> > > > > > > > > > from >> > > > > > > > > > > > another org and related to Kafka. That said, nothing >> > > really >> > > > > > > > enforces >> > > > > > > > > > it. >> > > > > > > > > > > > >> > > > > > > > > > > > Futures: I'll see if I can create some examples to >> > > > > demonstrate >> > > > > > > > Future >> > > > > > > > > > > > making interop easier. >> > > > > > > > > > > > >> > > > > > > > > > > > Regards, >> > > > > > > > > > > > C >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps < >> > > > > > jay.kr...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hey Clark, >> > > > > > > > > > > > > >> > > > > > > > > > > > > - Serialization: Yes I agree with these though I >> > don't >> > > > > > consider >> > > > > > > > the >> > > > > > > > > > > loss >> > > > > > > > > > > > of >> > > > > > > > > > > > > generics a big issue. I'll try to summarize what I >> > > would >> > > > > > > consider >> > > > > > > > > the >> > > > > > > > > > > > best >> > > > > > > > > > > > > alternative api with raw byte[]. >> > > > > > > > > > > > > >> > > > > > > > > > > > > - Maven: We had this debate a few months back and >> the >> > > > > > consensus >> > > > > > > > was >> > > > > > > > > > > > gradle. >> > > > > > > > > > > > > Is there a specific issue with the poms gradle >> > makes? I >> > > > am >> > > > > > > > > extremely >> > > > > > > > > > > > loath >> > > > > > > > > > > > > to revisit the issue as build issues are a >> recurring >> > > > thing >> > > > > > and >> > > > > > > no >> > > > > > > > > one >> > > > > > > > > > > > ever >> > > > > > > > > > > > > agrees and ultimately our build needs are very >> > simple. >> > > > > > > > > > > > > >> > > > > > > > > > > > > - Config: I'm not sure if I follow the point. Are >> you >> > > > > saying >> > > > > > we >> > > > > > > > > > should >> > > > > > > > > > > > use >> > > > > > > > > > > > > something in dropwizard for config? One principle >> > here >> > > is >> > > > > to >> > > > > > > try >> > > > > > > > to >> > > > > > > > > > > > remove >> > > > > > > > > > > > > as many client dependencies as possible as we >> > > inevitably >> > > > > run >> > > > > > > into >> > > > > > > > > > > > terrible >> > > > > > > > > > > > > compatibility issues with users who use the same >> > > library >> > > > or >> > > > > > its >> > > > > > > > > > > > > dependencies at different versions. Or are you >> > talking >> > > > > about >> > > > > > > > > > > maintaining >> > > > > > > > > > > > > compatibility with existing config parameters? I >> > think >> > > as >> > > > > > much >> > > > > > > > as a >> > > > > > > > > > > > config >> > > > > > > > > > > > > in the existing client makes sense it should have >> the >> > > > same >> > > > > > name >> > > > > > > > (I >> > > > > > > > > > was >> > > > > > > > > > > a >> > > > > > > > > > > > > bit sloppy about that so I'll fix any errors >> there). >> > > > There >> > > > > > are >> > > > > > > a >> > > > > > > > > few >> > > > > > > > > > > new >> > > > > > > > > > > > > things and we should give those reasonable >> defaults. >> > I >> > > > > think >> > > > > > > > config >> > > > > > > > > > is >> > > > > > > > > > > > > important so I'll start a thread on the config >> > package >> > > in >> > > > > > > there. >> > > > > > > > > > > > > >> > > > > > > > > > > > > - org.apache.kafka: We could do this. I always >> > > considered >> > > > > it >> > > > > > > kind >> > > > > > > > > of >> > > > > > > > > > an >> > > > > > > > > > > > odd >> > > > > > > > > > > > > thing Java programmers do that has no real >> motivation >> > > > (but >> > > > > I >> > > > > > > > could >> > > > > > > > > be >> > > > > > > > > > > > > re-educated!). I don't think it ends up reducing >> > naming >> > > > > > > conflicts >> > > > > > > > > in >> > > > > > > > > > > > > practice and it adds a lot of noise and nested >> > > > directories. >> > > > > > Is >> > > > > > > > > there >> > > > > > > > > > a >> > > > > > > > > > > > > reason you prefer this or just to be more >> standard? >> > > > > > > > > > > > > >> > > > > > > > > > > > > - Future: Basically I didn't see any particular >> > > > advantage. >> > > > > > The >> > > > > > > > > > cancel() >> > > > > > > > > > > > > method doesn't really make sense so probably >> wouldn't >> > > > work. >> > > > > > > > > Likewise >> > > > > > > > > > I >> > > > > > > > > > > > > dislike the checked exceptions it requires. >> > Basically I >> > > > > just >> > > > > > > > wrote >> > > > > > > > > > out >> > > > > > > > > > > > some >> > > > > > > > > > > > > code examples and it seemed cleaner with a special >> > > > purpose >> > > > > > > > object. >> > > > > > > > > I >> > > > > > > > > > > > wasn't >> > > > > > > > > > > > > actually aware of plans for improved futures in >> java >> > 8 >> > > or >> > > > > the >> > > > > > > > other >> > > > > > > > > > > > > integrations. Maybe you could elaborate on this a >> bit >> > > and >> > > > > > show >> > > > > > > > how >> > > > > > > > > it >> > > > > > > > > > > > would >> > > > > > > > > > > > > be used? Sounds promising, I just don't know a lot >> > > about >> > > > > it. >> > > > > > > > > > > > > >> > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman < >> > > > > > > > cl...@breyman.com> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Jay - Thanks for the call for comments. Here's >> some >> > > > > initial >> > > > > > > > > input: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Make message serialization a client >> > responsibility >> > > > > > (making >> > > > > > > > all >> > > > > > > > > > > > messages >> > > > > > > > > > > > > > byte[]). Reflection-based loading makes it >> harder >> > to >> > > > use >> > > > > > > > generic >> > > > > > > > > > > codecs >> > > > > > > > > > > > > > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build >> up >> > > > codec >> > > > > > > > > > > > > programmatically. >> > > > > > > > > > > > > > Non-default partitioning should require an >> explicit >> > > > > > partition >> > > > > > > > > key. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - I really like the fact that it will be native >> > Java. >> > > > > > Please >> > > > > > > > > > consider >> > > > > > > > > > > > > using >> > > > > > > > > > > > > > native maven and not sbt, gradle, ivy, etc as >> they >> > > > don't >> > > > > > > > reliably >> > > > > > > > > > > play >> > > > > > > > > > > > > nice >> > > > > > > > > > > > > > in the maven ecosystem. A jar without a >> well-formed >> > > pom >> > > > > > > doesn't >> > > > > > > > > > feel >> > > > > > > > > > > > > like a >> > > > > > > > > > > > > > real artifact. The pom's generated by sbt et al. >> > are >> > > > not >> > > > > > well >> > > > > > > > > > formed. >> > > > > > > > > > > > > Using >> > > > > > > > > > > > > > maven will make builds and IDE integration much >> > > > smoother. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Look at Nick Telford's dropwizard-extras >> package >> > in >> > > > > which >> > > > > > > he >> > > > > > > > > > > defines >> > > > > > > > > > > > > some >> > > > > > > > > > > > > > Jackson-compatible POJO's for loading >> > configuration. >> > > > > Seems >> > > > > > > like >> > > > > > > > > > your >> > > > > > > > > > > > > client >> > > > > > > > > > > > > > migration is similar. The config objects should >> > have >> > > > > > > > constructors >> > > > > > > > > > or >> > > > > > > > > > > > > > factories that accept Map<String, String> and >> > > > Properties >> > > > > > for >> > > > > > > > ease >> > > > > > > > > > of >> > > > > > > > > > > > > > migration. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Would you consider using the org.apache.kafka >> > > package >> > > > > for >> > > > > > > the >> > > > > > > > > new >> > > > > > > > > > > API >> > > > > > > > > > > > > > (quibble) >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Why create your own futures rather than use >> > > > > > > > > > > > > > java.util.concurrent.Future<Long> or similar? >> > > Standard >> > > > > > > futures >> > > > > > > > > will >> > > > > > > > > > > > play >> > > > > > > > > > > > > > nice with other reactive libs and things like >> J8's >> > > > > > > > > > ComposableFuture. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks again, >> > > > > > > > > > > > > > C >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover < >> > > > > > > > > > > roger.hoo...@gmail.com >> > > > > > > > > > > > > > >wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > A couple comments: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 1) Why does the config use a broker list >> instead >> > of >> > > > > > > > discovering >> > > > > > > > > > the >> > > > > > > > > > > > > > brokers >> > > > > > > > > > > > > > > in ZooKeeper? It doesn't match the >> > > HighLevelConsumer >> > > > > > API. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > 2) It looks like broker connections are >> created >> > on >> > > > > > demand. >> > > > > > > > I'm >> > > > > > > > > > > > > wondering >> > > > > > > > > > > > > > > if sometimes you might want to flush out >> config >> > or >> > > > > > network >> > > > > > > > > > > > connectivity >> > > > > > > > > > > > > > > issues before pushing the first message >> through. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Should there also be a >> KafkaProducer.connect() or >> > > > > .open() >> > > > > > > > > method >> > > > > > > > > > or >> > > > > > > > > > > > > > > connectAll()? I guess it would try to >> connect to >> > > all >> > > > > > > brokers >> > > > > > > > > in >> > > > > > > > > > > the >> > > > > > > > > > > > > > > BROKER_LIST_CONFIG >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > HTH, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Roger >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps < >> > > > > > > > > jay.kr...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > As mentioned in a previous email we are >> working >> > > on >> > > > a >> > > > > > > > > > > > > re-implementation >> > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > the producer. I would like to use this email >> > > thread >> > > > > to >> > > > > > > > > discuss >> > > > > > > > > > > the >> > > > > > > > > > > > > > > details >> > > > > > > > > > > > > > > > of the public API and the configuration. I >> > would >> > > > love >> > > > > > for >> > > > > > > > us >> > > > > > > > > to >> > > > > > > > > > > be >> > > > > > > > > > > > > > > > incredibly picky about this public api now >> so >> > it >> > > is >> > > > > as >> > > > > > > good >> > > > > > > > > as >> > > > > > > > > > > > > possible >> > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > we don't need to break it in the future. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > The best way to get a feel for the API is >> > > actually >> > > > to >> > > > > > > take >> > > > > > > > a >> > > > > > > > > > look >> > > > > > > > > > > > at >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > javadoc, my hope is to get the api docs good >> > > enough >> > > > > so >> > > > > > > that >> > > > > > > > > it >> > > > > > > > > > is >> > > > > > > > > > > > > > > > self-explanatory: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Please take a look at this API and give me >> any >> > > > > thoughts >> > > > > > > you >> > > > > > > > > may >> > > > > > > > > > > > have! >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > It may also be reasonable to take a look at >> the >> > > > > > configs: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > The actual code is posted here: >> > > > > > > > > > > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-1227 >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > A few questions or comments to kick things >> off: >> > > > > > > > > > > > > > > > 1. We need to make a decision on whether >> > > > > serialization >> > > > > > of >> > > > > > > > the >> > > > > > > > > > > > user's >> > > > > > > > > > > > > > key >> > > > > > > > > > > > > > > > and value should be done by the user (with >> our >> > > api >> > > > > just >> > > > > > > > > taking >> > > > > > > > > > > > > byte[]) >> > > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > if we should take an object and allow the >> user >> > to >> > > > > > > > configure a >> > > > > > > > > > > > > > Serializer >> > > > > > > > > > > > > > > > class which we instantiate via reflection. >> We >> > > take >> > > > > the >> > > > > > > > later >> > > > > > > > > > > > approach >> > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > the current producer, and I have carried >> this >> > > > through >> > > > > > to >> > > > > > > > this >> > > > > > > > > > > > > > prototype. >> > > > > > > > > > > > > > > > The tradeoff I see is this: taking byte[] is >> > > > actually >> > > > > > > > > simpler, >> > > > > > > > > > > the >> > > > > > > > > > > > > user >> > > > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > directly do whatever serialization they >> like. >> > The >> > > > > > > > > complication >> > > > > > > > > > is >> > > > > > > > > > > > > > > actually >> > > > > > > > > > > > > > > > partitioning. Currently partitioning is done >> > by a >> > > > > > similar >> > > > > > > > > > plug-in >> > > > > > > > > > > > api >> > > > > > > > > > > > > > > > (Partitioner) which the user can implement >> and >> > > > > > configure >> > > > > > > to >> > > > > > > > > > > > override >> > > > > > > > > > > > > > how >> > > > > > > > > > > > > > > > partitions are assigned. If we take byte[] >> as >> > > input >> > > > > > then >> > > > > > > we >> > > > > > > > > > have >> > > > > > > > > > > no >> > > > > > > > > > > > > > > access >> > > > > > > > > > > > > > > > to the original object and partitioning >> MUST be >> > > > done >> > > > > on >> > > > > > > the >> > > > > > > > > > > byte[]. >> > > > > > > > > > > > > > This >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > fine for hash partitioning. However for >> various >> > > > types >> > > > > > of >> > > > > > > > > > semantic >> > > > > > > > > > > > > > > > partitioning (range partitioning, or >> whatever) >> > > you >> > > > > > would >> > > > > > > > want >> > > > > > > > > > > > access >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > original object. In the current approach a >> > > producer >> > > > > who >> > > > > > > > > wishes >> > > > > > > > > > to >> > > > > > > > > > > > > send >> > > > > > > > > > > > > > > > byte[] they have serialized in their own >> code >> > can >> > > > > > > configure >> > > > > > > > > the >> > > > > > > > > > > > > > > > BytesSerialization we supply which is just a >> > "no >> > > > op" >> > > > > > > > > > > serialization. >> > > > > > > > > > > > > > > > 2. We should obsess over naming and make >> sure >> > > each >> > > > of >> > > > > > the >> > > > > > > > > class >> > > > > > > > > > > > names >> > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > good. >> > > > > > > > > > > > > > > > 3. Jun has already pointed out that we need >> to >> > > > > include >> > > > > > > the >> > > > > > > > > > topic >> > > > > > > > > > > > and >> > > > > > > > > > > > > > > > partition in the response, which is >> absolutely >> > > > > right. I >> > > > > > > > > haven't >> > > > > > > > > > > > done >> > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > yet but that definitely needs to be there. >> > > > > > > > > > > > > > > > 4. Currently RecordSend.await will throw an >> > > > exception >> > > > > > if >> > > > > > > > the >> > > > > > > > > > > > request >> > > > > > > > > > > > > > > > failed. The intention here is that >> > > > > > > > > > producer.send(message).await() >> > > > > > > > > > > > > > exactly >> > > > > > > > > > > > > > > > simulates a synchronous call. Guozhang has >> > noted >> > > > that >> > > > > > > this >> > > > > > > > > is a >> > > > > > > > > > > > > little >> > > > > > > > > > > > > > > > annoying since the user must then catch >> > > exceptions. >> > > > > > > However >> > > > > > > > > if >> > > > > > > > > > we >> > > > > > > > > > > > > > remove >> > > > > > > > > > > > > > > > this then if the user doesn't check for >> errors >> > > they >> > > > > > won't >> > > > > > > > > know >> > > > > > > > > > > one >> > > > > > > > > > > > > has >> > > > > > > > > > > > > > > > occurred, which I predict will be a common >> > > mistake. >> > > > > > > > > > > > > > > > 5. Perhaps there is more we could do to make >> > the >> > > > > async >> > > > > > > > > > callbacks >> > > > > > > > > > > > and >> > > > > > > > > > > > > > > future >> > > > > > > > > > > > > > > > we give back intuitive and easy to program >> > > against? >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Some background info on implementation: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > At a high level the primary difference in >> this >> > > > > producer >> > > > > > > is >> > > > > > > > > that >> > > > > > > > > > > it >> > > > > > > > > > > > > > > removes >> > > > > > > > > > > > > > > > the distinction between the "sync" and >> "async" >> > > > > > producer. >> > > > > > > > > > > > Effectively >> > > > > > > > > > > > > > all >> > > > > > > > > > > > > > > > requests are sent asynchronously but always >> > > return >> > > > a >> > > > > > > future >> > > > > > > > > > > > response >> > > > > > > > > > > > > > > object >> > > > > > > > > > > > > > > > that gives the offset as well as any error >> that >> > > may >> > > > > > have >> > > > > > > > > > occurred >> > > > > > > > > > > > > when >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > request is complete. The batching that is >> done >> > in >> > > > the >> > > > > > > async >> > > > > > > > > > > > producer >> > > > > > > > > > > > > > only >> > > > > > > > > > > > > > > > today is done whenever possible now. This >> means >> > > > that >> > > > > > the >> > > > > > > > sync >> > > > > > > > > > > > > producer, >> > > > > > > > > > > > > > > > under load, can get performance as good as >> the >> > > > async >> > > > > > > > producer >> > > > > > > > > > > > > > > (preliminary >> > > > > > > > > > > > > > > > results show the producer getting 1m >> > > messages/sec). >> > > > > > This >> > > > > > > > > works >> > > > > > > > > > > > > similar >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > group commit in databases but with respect >> to >> > the >> > > > > > actual >> > > > > > > > > > network >> > > > > > > > > > > > > > > > transmission--any messages that arrive >> while a >> > > send >> > > > > is >> > > > > > in >> > > > > > > > > > > progress >> > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > batched together. It is also possible to >> > > encourage >> > > > > > > batching >> > > > > > > > > > even >> > > > > > > > > > > > > under >> > > > > > > > > > > > > > > low >> > > > > > > > > > > > > > > > load to save server resources by >> introducing a >> > > > delay >> > > > > on >> > > > > > > the >> > > > > > > > > > send >> > > > > > > > > > > to >> > > > > > > > > > > > > > allow >> > > > > > > > > > > > > > > > more messages to accumulate; this is done >> using >> > > the >> > > > > > > > > > > linger.msconfig >> > > > > > > > > > > > > > > (this >> > > > > > > > > > > > > > > > is similar to Nagle's algorithm in TCP). >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > This producer does all network communication >> > > > > > > asynchronously >> > > > > > > > > and >> > > > > > > > > > > in >> > > > > > > > > > > > > > > parallel >> > > > > > > > > > > > > > > > to all servers so the performance penalty >> for >> > > > acks=-1 >> > > > > > and >> > > > > > > > > > waiting >> > > > > > > > > > > > on >> > > > > > > > > > > > > > > > replication should be much reduced. I >> haven't >> > > done >> > > > > much >> > > > > > > > > > > > benchmarking >> > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > this yet, though. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > The high level design is described a little >> > here, >> > > > > > though >> > > > > > > > this >> > > > > > > > > > is >> > > > > > > > > > > > now >> > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > little out of date: >> > > > > > > > > > > > > > > > >> > > > > > > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >