Hey Guys, My 2c.
1. RecordSend is a confusing name to me. Shouldn't it be RecordSendResponse? 2. Random nit: it's annoying to have the Javadoc info for the contstants on http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.h tml, but the string constant values on http://empathybox.com/kafka-javadoc/constant-values.html#kafka.clients.prod ucer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG. Find myself toggling between the two a lot. Not sure if this can be fixed easily. 3. MAX_PARTITION_SIZE_CONFIG - this name is kind of confusing. Specifically, use of the term "partition". thought it was related to Kafka topic partitions, not grouping together/batching. 4. METADATA_FETCH_TIMEOUT_CONFIG - what happens if this timeout is exceeded? Do we get an exception on send()? 5. METADATA_REFRESH_MS_CONFIG - why is this useful? Has to do with acks=0, right? Worth documenting, I think. 6. PARTITIONER_CLASS_CONFIG - link to partitioner interface in javadocs. Also, missing a period. 7. KafkaProducer.html - send() documentation says "archive" when you mean achieve, I think. 8. No javadoc for ProduceRequestResult. 9. In ProduceRequestResult, I understand baseOffset to be the first offset of the set. Is it possible to get the last offset, as well? If I send messages A, B, C, D, I'm most interested in D's offset. 10. In ProduceRequestResult, prefer Java-bean style (getError, isCompleted). 11. At first glance, I like option 1A in your serialization list. 12. We should definitely not introduce a ZK dependency for bootstrapping broker host/ports. 13. No favor on the Future discussion. I really^Int.Max hate checked exceptions, but I also like standard interfaces. It's a wash in my book. Cheers, Chris On 1/29/14 10:34 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote: >>> The challenge of directly exposing ProduceRequestResult is that the >offset >provided is just the base offset and there is no way to know for a >particular message where it was in relation to that base offset because >the >batching is transparent and non-deterministic. > >That's a good point. I need to look into the code more closely to see if >it >is possible to expose >something like Future<RequestResult> send(...) where RequestResult has the >right metadata >as well as helper APIs that the user would want. For example > >Future<RequestResult> messageResponse; >try { > messageResponse = send(...) >} catch(InterruptedException ie) { >} catch(ExecutionException ee) { >} > >if(messageResponse.hasError()) > // handle error >else { > String topic = messageResponse.topic(); > int partition = messageResponse.partition(); > long offset = messageResponse.offset(); // can this offset return the >absolute offset instead of just the relative offset? > ... >} > >I could've missed some reasons why we can't do the above. I just think >that >separating the future-like functionality of RecordSend >from the actual response metadata could be useful while supporting Future >at the same time. > >Thanks, >Neha > > > >On Wed, Jan 29, 2014 at 10:23 AM, Tom Brown <tombrow...@gmail.com> wrote: > >> I strongly support the user of Future. In fact, the cancel method may >>not >> be useless. Since the producer is meant to be used by N threads, it >>could >> easily get overloaded such that a produce request could not be sent >> immediately and had to be queued. In that case, cancelling should cause >>it >> to not actually get sent. >> >> --Tom >> >> >> On Wed, Jan 29, 2014 at 11:06 AM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> > Hey Neha, >> > >> > Error handling in RecordSend works as in Future you will get the >> exception >> > if there is one from any of the accessor methods or await(). >> > >> > The purpose of hasError was that you can write things slightly more >> simply >> > (which some people expressed preference for): >> > if(send.hasError()) >> > // do something >> > long offset = send.offset(); >> > >> > Instead of the more the slightly longer: >> > try { >> > long offset = send.offset(); >> > } catch (KafkaException e) { >> > // do something >> > } >> > >> > >> > On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede >><neha.narkh...@gmail.com >> > >wrote: >> > >> > > Regarding the use of Futures - >> > > >> > > Agree that there are some downsides to using Futures but both >> approaches >> > > have some tradeoffs. >> > > >> > > - Standardization and usability >> > > Future is a widely used and understood Java API and given that the >> > > functionality that RecordSend hopes to provide is essentially that >>of >> > > Future, I think it makes sense to expose a widely understood public >>API >> > for >> > > our clients. RecordSend, on the other hand, seems to provide some >>APIs >> > that >> > > are very similar to that of Future, in addition to exposing a bunch >>of >> > APIs >> > > that belong to ProduceRequestResult. As a user, I would've really >> > preferred >> > > to deal with ProduceRequestResult directly - >> > > Future<ProduceRequestResult> send(...) >> > > >> > > - Error handling >> > > RecordSend's error handling is quite unintuitive where the user has >>to >> > > remember to invoke hasError and error, instead of just throwing the >> > > exception. Now there are >> > > some downsides regarding error handling with the Future as well, >>where >> > the >> > > user has to catch InterruptedException when we would never run into >>it. >> > > However, it seems like a price worth paying for supporting a >>standard >> API >> > > and error handling >> > > >> > > - Unused APIs >> > > This is a downside of using Future, where the cancel() operation >>would >> > > always return false and mean nothing. But we can mention that >>caveat in >> > our >> > > Java docs. >> > > >> > > To summarize, I would prefer to expose a well understood and widely >> > adopted >> > > Java API and put up with the overhead of catching one unnecessary >> checked >> > > exception, rather than wrap the useful ProduceRequestResult in a >>custom >> > > async object (RecordSend) and explain that to our many users. >> > > >> > > Thanks, >> > > Neha >> > > >> > > >> > > >> > > >> > > On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > >> > > > Hey Neha, >> > > > >> > > > Can you elaborate on why you prefer using Java's Future? The >>downside >> > in >> > > my >> > > > mind is the use of the checked InterruptedException and >> > > ExecutionException. >> > > > ExecutionException is arguable, but forcing you to catch >> > > > InterruptedException, often in code that can't be interrupted, >>seems >> > > > perverse. It also leaves us with the cancel() method which I don't >> > think >> > > we >> > > > really can implement. >> > > > >> > > > Option 1A, to recap/elaborate, was the following. There is no >> > Serializer >> > > or >> > > > Partitioner api. We take a byte[] key and value and an optional >> integer >> > > > partition. If you specify the integer partition it will be used. >>If >> you >> > > do >> > > > not specify a key or a partition the partition will be chosen in a >> > round >> > > > robin fashion. If you specify a key but no partition we will >>chose a >> > > > partition based on a hash of the key. In order to let the user >>find >> the >> > > > partition we will need to given them access to the Cluster >>instance >> > > > directly from the producer. >> > > > >> > > > -Jay >> > > > >> > > > >> > > > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede < >> > neha.narkh...@gmail.com >> > > > >wrote: >> > > > >> > > > > Here are more thoughts on the public APIs - >> > > > > >> > > > > - I suggest we use java's Future instead of custom Future >> especially >> > > > since >> > > > > it is part of the public API >> > > > > >> > > > > - Serialization: I like the simplicity of the producer APIs with >> the >> > > > > absence of serialization where we just deal with byte arrays for >> keys >> > > and >> > > > > values. What I don't like about this is the performance >>overhead on >> > the >> > > > > Partitioner for any kind of custom partitioning based on the >> > > > partitionKey. >> > > > > Since the only purpose of partitionKey is to do custom >> partitioning, >> > > why >> > > > > can't we take it in directly as an integer and let the user >>figure >> > out >> > > > the >> > > > > mapping from partition_key -> partition_id using the >>getCluster() >> > API? >> > > > If I >> > > > > understand correctly, this is similar to what you suggested as >>part >> > of >> > > > > option 1A. I like this approach since it maintains the >>simplicity >> of >> > > APIs >> > > > > by allowing us to deal with bytes and does not compromise >> performance >> > > in >> > > > > the custom partitioning case. >> > > > > >> > > > > Thanks, >> > > > > Neha >> > > > > >> > > > > >> > > > > >> > > > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps <jay.kr...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hey Tom, >> > > > > > >> > > > > > That sounds cool. How did you end up handling parallel I/O if >>you >> > > wrap >> > > > > the >> > > > > > individual connections? Don't you need some selector that >>selects >> > > over >> > > > > all >> > > > > > the connections? >> > > > > > >> > > > > > -Jay >> > > > > > >> > > > > > >> > > > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown >><tombrow...@gmail.com >> > >> > > > wrote: >> > > > > > >> > > > > > > I implemented a 0.7 client in pure java, and its API very >> closely >> > > > > > resembled >> > > > > > > this. (When multiple people independently engineer the same >> > > solution, >> > > > > > it's >> > > > > > > probably good... right?). However, there were a few >> architectural >> > > > > > > differences with my client: >> > > > > > > >> > > > > > > 1. The basic client itself was just an asynchronous layer >> around >> > > the >> > > > > > > different server functions. In and of itself it had no >> knowledge >> > of >> > > > > > > partitions, only servers (and maintained TCP connections to >> > them). >> > > > > > > >> > > > > > > 2. The main producer was an additional layer that provided a >> > > > high-level >> > > > > > > interface that could batch individual messages based on >> > partition. >> > > > > > > >> > > > > > > 3. Knowledge of partitioning was done via an interface so >>that >> > > > > different >> > > > > > > strategies could be used. >> > > > > > > >> > > > > > > 4. Partitioning was done by the user, with knowledge of the >> > > available >> > > > > > > partitions provided by #3. >> > > > > > > >> > > > > > > 5. Serialization was done by the user to simplify the API. >> > > > > > > >> > > > > > > 6. Futures were used to make asynchronous emulate >>synchronous >> > > calls. >> > > > > > > >> > > > > > > >> > > > > > > The main benefit of this approach is flexibility. For >>example, >> > > since >> > > > > the >> > > > > > > base client was just a managed connection (and not >>inherently a >> > > > > > producer), >> > > > > > > it was easy to composite a produce request and an offsets >> request >> > > > > > together >> > > > > > > into a confirmed produce request (officially not available >>in >> > 0.7). >> > > > > > > >> > > > > > > Decoupling the basic client from partition management >>allowed >> the >> > > me >> > > > to >> > > > > > > implement zk discovery as a separate project so that the >>main >> > > project >> > > > > had >> > > > > > > no complex dependencies. The same was true of decoupling >> > > > serialization. >> > > > > > > It's trivial to build an optional layer that adds those >> features >> > > in, >> > > > > > while >> > > > > > > allowing access to the base APIs for those that need it. >> > > > > > > >> > > > > > > Using standard Future objects was also beneficial, since I >> could >> > > > > combine >> > > > > > > them with existing tools (such as guava). >> > > > > > > >> > > > > > > It may be too late to be of use, but I have been working >>with >> my >> > > > > > company's >> > > > > > > legal department to release the implementation I described >> above. >> > > If >> > > > > > you're >> > > > > > > interested in it, let me know. >> > > > > > > >> > > > > > > >> > > > > > > To sum up my thoughts regarding the new API, I think it's a >> great >> > > > > start. >> > > > > > I >> > > > > > > would like to see a more layered approach so I can use the >> parts >> > I >> > > > > want, >> > > > > > > and adapt the other parts as needed. I would also like to >>see >> > > > standard >> > > > > > > interfaces (especially Future) used where they makes sense. >> > > > > > > >> > > > > > > --Tom >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Jan 28, 2014 at 1:33 PM, Roger Hoover < >> > > > roger.hoo...@gmail.com >> > > > > > > >wrote: >> > > > > > > >> > > > > > > > +1 ListenableFuture: If this works similar to Deferreds in >> > > Twisted >> > > > > > Python >> > > > > > > > or Promised IO in Javascript, I think this is a great >>pattern >> > for >> > > > > > > > decoupling your callback logic from the place where the >> Future >> > is >> > > > > > > > generated. You can register as many callbacks as you >>like, >> > each >> > > in >> > > > > the >> > > > > > > > appropriate layer of the code and have each observer get >> > notified >> > > > > when >> > > > > > > the >> > > > > > > > promised i/o is complete without any of them knowing about >> each >> > > > > other. >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps < >> > jay.kr...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hey Ross, >> > > > > > > > > >> > > > > > > > > - ListenableFuture: Interesting. That would be an >> alternative >> > > to >> > > > > the >> > > > > > > > direct >> > > > > > > > > callback support we provide. There could be pros to >>this, >> let >> > > me >> > > > > > think >> > > > > > > > > about it. >> > > > > > > > > - We could provide layering, but I feel that the >> > serialization >> > > is >> > > > > > such >> > > > > > > a >> > > > > > > > > small thing we should just make a decision and chose >>one, >> it >> > > > > doesn't >> > > > > > > seem >> > > > > > > > > to me to justify a whole public facing layer. >> > > > > > > > > - Yes, this is fairly esoteric, essentially I think it >>is >> > > fairly >> > > > > > > similar >> > > > > > > > to >> > > > > > > > > databases like DynamoDB that allow you to specify two >> > partition >> > > > > keys >> > > > > > (I >> > > > > > > > > think DynamoDB does this...). The reasoning is that in >>fact >> > > there >> > > > > are >> > > > > > > > > several things you can use the key field for: (1) to >> compute >> > > the >> > > > > > > > partition >> > > > > > > > > to store the data in, (2) as a unique identifier to >> > deduplicate >> > > > > that >> > > > > > > > > partition's records within a log. These two things are >> almost >> > > > > always >> > > > > > > the >> > > > > > > > > same, but occationally may differ when you want to group >> data >> > > in >> > > > a >> > > > > > more >> > > > > > > > > sophisticated way then just a hash of the primary key >>but >> > still >> > > > > > retain >> > > > > > > > the >> > > > > > > > > proper primary key for delivery to the consumer and log >> > > > compaction. >> > > > > > > > > >> > > > > > > > > -Jay >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Tue, Jan 28, 2014 at 3:24 AM, Ross Black < >> > > > > ross.w.bl...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jay, >> > > > > > > > > > >> > > > > > > > > > - Just to add some more info/confusion about possibly >> using >> > > > > Future >> > > > > > > ... >> > > > > > > > > > If Kafka uses a JDK future, it plays nicely with >>other >> > > > > frameworks >> > > > > > > as >> > > > > > > > > > well. >> > > > > > > > > > Google Guava has a ListenableFuture that allows >> callback >> > > > > handling >> > > > > > > to >> > > > > > > > be >> > > > > > > > > > added via the returned future, and allows the >>callbacks >> to >> > be >> > > > > > passed >> > > > > > > > off >> > > > > > > > > to >> > > > > > > > > > a specified executor. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >>http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/co >>m/google/common/util/concurrent/ListenableFuture.html >> > > > > > > > > > The JDK future can easily be converted to a >>listenable >> > > > future. >> > > > > > > > > > >> > > > > > > > > > - On the question of byte[] vs Object, could this be >> solved >> > > by >> > > > > > > layering >> > > > > > > > > the >> > > > > > > > > > API? eg. a raw producer (use byte[] and specify the >> > > partition >> > > > > > > number) >> > > > > > > > > and >> > > > > > > > > > a normal producer (use generic object and specify a >> > > > Partitioner)? >> > > > > > > > > > >> > > > > > > > > > - I am confused by the keys in ProducerRecord and >> > > Partitioner. >> > > > > > What >> > > > > > > is >> > > > > > > > > the >> > > > > > > > > > usage for both a key and a partition key? (I am not >>yet >> > using >> > > > > 0.8) >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > Ross >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On 28 January 2014 05:00, Xavier Stevens < >> > xav...@gaikai.com> >> > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > AutoCloseable would be nice for us as most of our >>code >> is >> > > > using >> > > > > > > Java >> > > > > > > > 7 >> > > > > > > > > at >> > > > > > > > > > > this point. >> > > > > > > > > > > >> > > > > > > > > > > I like Dropwizard's configuration mapping to POJOs >>via >> > > > Jackson, >> > > > > > but >> > > > > > > > if >> > > > > > > > > > you >> > > > > > > > > > > wanted to stick with property maps I don't care >>enough >> to >> > > > > object. >> > > > > > > > > > > >> > > > > > > > > > > If the producer only dealt with bytes, is there a >>way >> we >> > > > could >> > > > > > > still >> > > > > > > > > due >> > > > > > > > > > > partition plugins without specifying the number >> > > explicitly? I >> > > > > > would >> > > > > > > > > > prefer >> > > > > > > > > > > to be able to pass in field(s) that would be used by >> the >> > > > > > > partitioner. >> > > > > > > > > > > Obviously if this wasn't possible you could always >> > > > deserialize >> > > > > > the >> > > > > > > > > object >> > > > > > > > > > > in the partitioner and grab the fields you want, but >> that >> > > > seems >> > > > > > > > really >> > > > > > > > > > > expensive to do on every message. >> > > > > > > > > > > >> > > > > > > > > > > It would also be nice to have a Java API Encoder >> > > constructor >> > > > > > taking >> > > > > > > > in >> > > > > > > > > > > VerifiableProperties. Scala understands how to >>handle >> > > "props: >> > > > > > > > > > > VerifiableProperties = null", but Java doesn't. So >>you >> > > don't >> > > > > run >> > > > > > > into >> > > > > > > > > > this >> > > > > > > > > > > problem until runtime. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > -Xavier >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman < >> > > > > > cl...@breyman.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Jay - >> > > > > > > > > > > > >> > > > > > > > > > > > Config - your explanation makes sense. I'm just so >> > > > accustomed >> > > > > > to >> > > > > > > > > having >> > > > > > > > > > > > Jackson automatically map my configuration >>objects to >> > > POJOs >> > > > > > that >> > > > > > > > I've >> > > > > > > > > > > > stopped using property files. They are lingua >>franca. >> > The >> > > > > only >> > > > > > > > > thought >> > > > > > > > > > > > might be to separate the config interface from the >> > > > > > implementation >> > > > > > > > to >> > > > > > > > > > > allow >> > > > > > > > > > > > for alternatives, but that might undermine your >>point >> > of >> > > > "do >> > > > > it >> > > > > > > > this >> > > > > > > > > > way >> > > > > > > > > > > so >> > > > > > > > > > > > that everyone can find it where they expect it". >> > > > > > > > > > > > >> > > > > > > > > > > > Serialization: Of the options, I like 1A the best, >> > though >> > > > > > > possibly >> > > > > > > > > with >> > > > > > > > > > > > either an option to specify a partition key rather >> than >> > > ID >> > > > > or a >> > > > > > > > > helper >> > > > > > > > > > to >> > > > > > > > > > > > translate an arbitrary byte[] or long into a >> partition >> > > > > number. >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks >> > > > > > > > > > > > Clark >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps < >> > > > > > jay.kr...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for the detailed thoughts. Let me >>elaborate >> on >> > > the >> > > > > > > config >> > > > > > > > > > thing. >> > > > > > > > > > > > > >> > > > > > > > > > > > > I agree that at first glance key-value strings >> don't >> > > seem >> > > > > > like >> > > > > > > a >> > > > > > > > > very >> > > > > > > > > > > > good >> > > > > > > > > > > > > configuration api for a client. Surely a >>well-typed >> > > > config >> > > > > > > class >> > > > > > > > > > would >> > > > > > > > > > > be >> > > > > > > > > > > > > better! I actually disagree and let me see if I >>can >> > > > > convince >> > > > > > > you. >> > > > > > > > > > > > > >> > > > > > > > > > > > > My reasoning has nothing to do with the api and >> > > > everything >> > > > > to >> > > > > > > do >> > > > > > > > > with >> > > > > > > > > > > > > operations. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Clients are embedded in applications which are >> > > themselves >> > > > > > > > > configured. >> > > > > > > > > > > In >> > > > > > > > > > > > > any place that takes operations seriously the >> > > > configuration >> > > > > > for >> > > > > > > > > these >> > > > > > > > > > > > > applications will be version controlled and >> > maintained >> > > > > > through >> > > > > > > > some >> > > > > > > > > > > kind >> > > > > > > > > > > > of >> > > > > > > > > > > > > config management system. If we give a config >>class >> > > with >> > > > > > > getters >> > > > > > > > > and >> > > > > > > > > > > > > setters the application has to expose those >> > properties >> > > to >> > > > > its >> > > > > > > > > > > > > configuration. What invariably happens is that >>the >> > > > > > application >> > > > > > > > > > exposes >> > > > > > > > > > > > only >> > > > > > > > > > > > > a choice few properties that they thought they >> would >> > > > > change. >> > > > > > > > > > > Furthermore >> > > > > > > > > > > > > the application will make up a name for these >> configs >> > > > that >> > > > > > > seems >> > > > > > > > > > > > intuitive >> > > > > > > > > > > > > at the time in the 2 seconds the engineer spends >> > > thinking >> > > > > > about >> > > > > > > > it. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Now consider the result of this in the large. >>You >> end >> > > up >> > > > > with >> > > > > > > > > dozens >> > > > > > > > > > or >> > > > > > > > > > > > > hundreds of applications that have the client >> > embedded. >> > > > > Each >> > > > > > > > > exposes >> > > > > > > > > > a >> > > > > > > > > > > > > different, inadequate subset of the possible >> configs, >> > > > each >> > > > > > with >> > > > > > > > > > > different >> > > > > > > > > > > > > names. It is a nightmare. >> > > > > > > > > > > > > >> > > > > > > > > > > > > If you use a string-string map the config system >> can >> > > > > directly >> > > > > > > > get a >> > > > > > > > > > > > bundle >> > > > > > > > > > > > > of config key-value pairs and put them into the >> > client. >> > > > > This >> > > > > > > > means >> > > > > > > > > > that >> > > > > > > > > > > > all >> > > > > > > > > > > > > configuration is automatically available with >>the >> > name >> > > > > > > documented >> > > > > > > > > on >> > > > > > > > > > > the >> > > > > > > > > > > > > website in every application that does this. If >>you >> > > > upgrade >> > > > > > to >> > > > > > > a >> > > > > > > > > new >> > > > > > > > > > > > kafka >> > > > > > > > > > > > > version with more configs those will be exposed >> too. >> > If >> > > > you >> > > > > > > > realize >> > > > > > > > > > > that >> > > > > > > > > > > > > you need to change a default you can just go >> through >> > > your >> > > > > > > configs >> > > > > > > > > and >> > > > > > > > > > > > > change it everywhere as it will have the same >>name >> > > > > > everywhere. >> > > > > > > > > > > > > >> > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman < >> > > > > > > > cl...@breyman.com> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks Jay. I'll see if I can put together a >>more >> > > > > complete >> > > > > > > > > > response, >> > > > > > > > > > > > > > perhaps as separate threads so that topics >>don't >> > get >> > > > > > > entangled. >> > > > > > > > > In >> > > > > > > > > > > the >> > > > > > > > > > > > > mean >> > > > > > > > > > > > > > time, here's a couple responses: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Serialization: you've broken out a sub-thread >>so >> > i'll >> > > > > reply >> > > > > > > > > there. >> > > > > > > > > > My >> > > > > > > > > > > > > bias >> > > > > > > > > > > > > > is that I like generics (except for >>type-erasure) >> > and >> > > > in >> > > > > > > > > particular >> > > > > > > > > > > > they >> > > > > > > > > > > > > > make it easy to compose serializers for >>compound >> > > > payloads >> > > > > > > (e.g. >> > > > > > > > > > when >> > > > > > > > > > > a >> > > > > > > > > > > > > > common header wraps a payload of parameterized >> > type). >> > > > > I'll >> > > > > > > > > respond >> > > > > > > > > > to >> > > > > > > > > > > > > your >> > > > > > > > > > > > > > 4-options message with an example. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Build: I've seen a lot of "maven-compatible" >> build >> > > > > systems >> > > > > > > > > produce >> > > > > > > > > > > > > > "artifacts" that aren't really artifacts - no >> > > embedded >> > > > > POM >> > > > > > > or, >> > > > > > > > > > worst, >> > > > > > > > > > > > > > malformed POM. I know the sbt-generated >>artifacts >> > > were >> > > > > this >> > > > > > > > way - >> > > > > > > > > > > onus >> > > > > > > > > > > > is >> > > > > > > > > > > > > > on me to see what gradle is spitting out and >> what a >> > > > maven >> > > > > > > build >> > > > > > > > > > might >> > > > > > > > > > > > > look >> > > > > > > > > > > > > > like. Maven may be old and boring, but it gets >> out >> > of >> > > > the >> > > > > > way >> > > > > > > > and >> > > > > > > > > > > > > > integrates really seamlessly with a lot of >>IDEs. >> > When >> > > > > some >> > > > > > > > scala >> > > > > > > > > > > > > projects I >> > > > > > > > > > > > > > was working on in the fall of 2011 switched >>from >> > sbt >> > > to >> > > > > > > maven, >> > > > > > > > > > build >> > > > > > > > > > > > > became >> > > > > > > > > > > > > > a non-issue. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Config: Not a big deal and no, I don't think >>a >> > > > > dropwizard >> > > > > > > > > > dependency >> > > > > > > > > > > > is >> > > > > > > > > > > > > > appropriate. I do like using simple entity >>beans >> > > > (POJO's >> > > > > > not >> > > > > > > > > j2EE) >> > > > > > > > > > > for >> > > > > > > > > > > > > > configuration, especially if they can be >> marshalled >> > > > > without >> > > > > > > > > > > annotation >> > > > > > > > > > > > by >> > > > > > > > > > > > > > Jackson. I only mentioned the >>dropwizard-extras >> > > > because >> > > > > it >> > > > > > > has >> > > > > > > > > > some >> > > > > > > > > > > > > entity >> > > > > > > > > > > > > > bean versions of the ZK and Kafka configs. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Domain-packaging: Also not a big deal - it's >> what's >> > > > > > expected >> > > > > > > > and >> > > > > > > > > > it's >> > > > > > > > > > > > > > pretty free in most IDE's. The advantages I >>see >> is >> > > that >> > > > > it >> > > > > > is >> > > > > > > > > clear >> > > > > > > > > > > > > whether >> > > > > > > > > > > > > > something is from the Apache Kafka project and >> > > whether >> > > > > > > > something >> > > > > > > > > is >> > > > > > > > > > > > from >> > > > > > > > > > > > > > another org and related to Kafka. That said, >> > nothing >> > > > > really >> > > > > > > > > > enforces >> > > > > > > > > > > > it. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Futures: I'll see if I can create some >>examples >> to >> > > > > > > demonstrate >> > > > > > > > > > Future >> > > > > > > > > > > > > > making interop easier. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > C >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps < >> > > > > > > > jay.kr...@gmail.com> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Clark, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Serialization: Yes I agree with these >>though >> I >> > > > don't >> > > > > > > > consider >> > > > > > > > > > the >> > > > > > > > > > > > > loss >> > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > generics a big issue. I'll try to summarize >> what >> > I >> > > > > would >> > > > > > > > > consider >> > > > > > > > > > > the >> > > > > > > > > > > > > > best >> > > > > > > > > > > > > > > alternative api with raw byte[]. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Maven: We had this debate a few months >>back >> and >> > > the >> > > > > > > > consensus >> > > > > > > > > > was >> > > > > > > > > > > > > > gradle. >> > > > > > > > > > > > > > > Is there a specific issue with the poms >>gradle >> > > > makes? I >> > > > > > am >> > > > > > > > > > > extremely >> > > > > > > > > > > > > > loath >> > > > > > > > > > > > > > > to revisit the issue as build issues are a >> > > recurring >> > > > > > thing >> > > > > > > > and >> > > > > > > > > no >> > > > > > > > > > > one >> > > > > > > > > > > > > > ever >> > > > > > > > > > > > > > > agrees and ultimately our build needs are >>very >> > > > simple. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Config: I'm not sure if I follow the >>point. >> Are >> > > you >> > > > > > > saying >> > > > > > > > we >> > > > > > > > > > > > should >> > > > > > > > > > > > > > use >> > > > > > > > > > > > > > > something in dropwizard for config? One >> principle >> > > > here >> > > > > is >> > > > > > > to >> > > > > > > > > try >> > > > > > > > > > to >> > > > > > > > > > > > > > remove >> > > > > > > > > > > > > > > as many client dependencies as possible as >>we >> > > > > inevitably >> > > > > > > run >> > > > > > > > > into >> > > > > > > > > > > > > > terrible >> > > > > > > > > > > > > > > compatibility issues with users who use the >> same >> > > > > library >> > > > > > or >> > > > > > > > its >> > > > > > > > > > > > > > > dependencies at different versions. Or are >>you >> > > > talking >> > > > > > > about >> > > > > > > > > > > > > maintaining >> > > > > > > > > > > > > > > compatibility with existing config >>parameters? >> I >> > > > think >> > > > > as >> > > > > > > > much >> > > > > > > > > > as a >> > > > > > > > > > > > > > config >> > > > > > > > > > > > > > > in the existing client makes sense it should >> have >> > > the >> > > > > > same >> > > > > > > > name >> > > > > > > > > > (I >> > > > > > > > > > > > was >> > > > > > > > > > > > > a >> > > > > > > > > > > > > > > bit sloppy about that so I'll fix any errors >> > > there). >> > > > > > There >> > > > > > > > are >> > > > > > > > > a >> > > > > > > > > > > few >> > > > > > > > > > > > > new >> > > > > > > > > > > > > > > things and we should give those reasonable >> > > defaults. >> > > > I >> > > > > > > think >> > > > > > > > > > config >> > > > > > > > > > > > is >> > > > > > > > > > > > > > > important so I'll start a thread on the >>config >> > > > package >> > > > > in >> > > > > > > > > there. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - org.apache.kafka: We could do this. I >>always >> > > > > considered >> > > > > > > it >> > > > > > > > > kind >> > > > > > > > > > > of >> > > > > > > > > > > > an >> > > > > > > > > > > > > > odd >> > > > > > > > > > > > > > > thing Java programmers do that has no real >> > > motivation >> > > > > > (but >> > > > > > > I >> > > > > > > > > > could >> > > > > > > > > > > be >> > > > > > > > > > > > > > > re-educated!). I don't think it ends up >> reducing >> > > > naming >> > > > > > > > > conflicts >> > > > > > > > > > > in >> > > > > > > > > > > > > > > practice and it adds a lot of noise and >>nested >> > > > > > directories. >> > > > > > > > Is >> > > > > > > > > > > there >> > > > > > > > > > > > a >> > > > > > > > > > > > > > > reason you prefer this or just to be more >> > standard? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Future: Basically I didn't see any >>particular >> > > > > > advantage. >> > > > > > > > The >> > > > > > > > > > > > cancel() >> > > > > > > > > > > > > > > method doesn't really make sense so probably >> > > wouldn't >> > > > > > work. >> > > > > > > > > > > Likewise >> > > > > > > > > > > > I >> > > > > > > > > > > > > > > dislike the checked exceptions it requires. >> > > > Basically I >> > > > > > > just >> > > > > > > > > > wrote >> > > > > > > > > > > > out >> > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > code examples and it seemed cleaner with a >> > special >> > > > > > purpose >> > > > > > > > > > object. >> > > > > > > > > > > I >> > > > > > > > > > > > > > wasn't >> > > > > > > > > > > > > > > actually aware of plans for improved >>futures in >> > > java >> > > > 8 >> > > > > or >> > > > > > > the >> > > > > > > > > > other >> > > > > > > > > > > > > > > integrations. Maybe you could elaborate on >> this a >> > > bit >> > > > > and >> > > > > > > > show >> > > > > > > > > > how >> > > > > > > > > > > it >> > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > be used? Sounds promising, I just don't >>know a >> > lot >> > > > > about >> > > > > > > it. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark >>Breyman >> < >> > > > > > > > > > cl...@breyman.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Jay - Thanks for the call for comments. >> Here's >> > > some >> > > > > > > initial >> > > > > > > > > > > input: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > - Make message serialization a client >> > > > responsibility >> > > > > > > > (making >> > > > > > > > > > all >> > > > > > > > > > > > > > messages >> > > > > > > > > > > > > > > > byte[]). Reflection-based loading makes it >> > harder >> > > > to >> > > > > > use >> > > > > > > > > > generic >> > > > > > > > > > > > > codecs >> > > > > > > > > > > > > > > > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or >> build >> > > up >> > > > > > codec >> > > > > > > > > > > > > > > programmatically. >> > > > > > > > > > > > > > > > Non-default partitioning should require an >> > > explicit >> > > > > > > > partition >> > > > > > > > > > > key. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > - I really like the fact that it will be >> native >> > > > Java. >> > > > > > > > Please >> > > > > > > > > > > > consider >> > > > > > > > > > > > > > > using >> > > > > > > > > > > > > > > > native maven and not sbt, gradle, ivy, >>etc as >> > > they >> > > > > > don't >> > > > > > > > > > reliably >> > > > > > > > > > > > > play >> > > > > > > > > > > > > > > nice >> > > > > > > > > > > > > > > > in the maven ecosystem. A jar without a >> > > well-formed >> > > > > pom >> > > > > > > > > doesn't >> > > > > > > > > > > > feel >> > > > > > > > > > > > > > > like a >> > > > > > > > > > > > > > > > real artifact. The pom's generated by sbt >>et >> > al. >> > > > are >> > > > > > not >> > > > > > > > well >> > > > > > > > > > > > formed. >> > > > > > > > > > > > > > > Using >> > > > > > > > > > > > > > > > maven will make builds and IDE integration >> much >> > > > > > smoother. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > - Look at Nick Telford's dropwizard-extras >> > > package >> > > > in >> > > > > > > which >> > > > > > > > > he >> > > > > > > > > > > > > defines >> > > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > Jackson-compatible POJO's for loading >> > > > configuration. >> > > > > > > Seems >> > > > > > > > > like >> > > > > > > > > > > > your >> > > > > > > > > > > > > > > client >> > > > > > > > > > > > > > > > migration is similar. The config objects >> should >> > > > have >> > > > > > > > > > constructors >> > > > > > > > > > > > or >> > > > > > > > > > > > > > > > factories that accept Map<String, String> >>and >> > > > > > Properties >> > > > > > > > for >> > > > > > > > > > ease >> > > > > > > > > > > > of >> > > > > > > > > > > > > > > > migration. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > - Would you consider using the >> org.apache.kafka >> > > > > package >> > > > > > > for >> > > > > > > > > the >> > > > > > > > > > > new >> > > > > > > > > > > > > API >> > > > > > > > > > > > > > > > (quibble) >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > - Why create your own futures rather than >>use >> > > > > > > > > > > > > > > > java.util.concurrent.Future<Long> or >>similar? >> > > > > Standard >> > > > > > > > > futures >> > > > > > > > > > > will >> > > > > > > > > > > > > > play >> > > > > > > > > > > > > > > > nice with other reactive libs and things >>like >> > > J8's >> > > > > > > > > > > > ComposableFuture. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks again, >> > > > > > > > > > > > > > > > C >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger >> Hoover < >> > > > > > > > > > > > > roger.hoo...@gmail.com >> > > > > > > > > > > > > > > > >wrote: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > A couple comments: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 1) Why does the config use a broker list >> > > instead >> > > > of >> > > > > > > > > > discovering >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > brokers >> > > > > > > > > > > > > > > > > in ZooKeeper? It doesn't match the >> > > > > HighLevelConsumer >> > > > > > > > API. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 2) It looks like broker connections are >> > created >> > > > on >> > > > > > > > demand. >> > > > > > > > > > I'm >> > > > > > > > > > > > > > > wondering >> > > > > > > > > > > > > > > > > if sometimes you might want to flush out >> > config >> > > > or >> > > > > > > > network >> > > > > > > > > > > > > > connectivity >> > > > > > > > > > > > > > > > > issues before pushing the first message >> > > through. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Should there also be a >> > KafkaProducer.connect() >> > > or >> > > > > > > .open() >> > > > > > > > > > > method >> > > > > > > > > > > > or >> > > > > > > > > > > > > > > > > connectAll()? I guess it would try to >> > connect >> > > to >> > > > > all >> > > > > > > > > brokers >> > > > > > > > > > > in >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > BROKER_LIST_CONFIG >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > HTH, >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Roger >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay >> Kreps < >> > > > > > > > > > > jay.kr...@gmail.com >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > As mentioned in a previous email we >>are >> > > working >> > > > > on >> > > > > > a >> > > > > > > > > > > > > > > re-implementation >> > > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > the producer. I would like to use this >> > email >> > > > > thread >> > > > > > > to >> > > > > > > > > > > discuss >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > details >> > > > > > > > > > > > > > > > > > of the public API and the >>configuration. >> I >> > > > would >> > > > > > love >> > > > > > > > for >> > > > > > > > > > us >> > > > > > > > > > > to >> > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > incredibly picky about this public api >> now >> > so >> > > > it >> > > > > is >> > > > > > > as >> > > > > > > > > good >> > > > > > > > > > > as >> > > > > > > > > > > > > > > possible >> > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > we don't need to break it in the >>future. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > The best way to get a feel for the >>API is >> > > > > actually >> > > > > > to >> > > > > > > > > take >> > > > > > > > > > a >> > > > > > > > > > > > look >> > > > > > > > > > > > > > at >> > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > javadoc, my hope is to get the api >>docs >> > good >> > > > > enough >> > > > > > > so >> > > > > > > > > that >> > > > > > > > > > > it >> > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > self-explanatory: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >>http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/Kaf >>kaProducer.html >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Please take a look at this API and >>give >> me >> > > any >> > > > > > > thoughts >> > > > > > > > > you >> > > > > > > > > > > may >> > > > > > > > > > > > > > have! >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > It may also be reasonable to take a >>look >> at >> > > the >> > > > > > > > configs: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >>http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig >>.html >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > The actual code is posted here: >> > > > > > > > > > > > > > > > > > >> > > > https://issues.apache.org/jira/browse/KAFKA-1227 >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > A few questions or comments to kick >> things >> > > off: >> > > > > > > > > > > > > > > > > > 1. We need to make a decision on >>whether >> > > > > > > serialization >> > > > > > > > of >> > > > > > > > > > the >> > > > > > > > > > > > > > user's >> > > > > > > > > > > > > > > > key >> > > > > > > > > > > > > > > > > > and value should be done by the user >> (with >> > > our >> > > > > api >> > > > > > > just >> > > > > > > > > > > taking >> > > > > > > > > > > > > > > byte[]) >> > > > > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > > > if we should take an object and allow >>the >> > > user >> > > > to >> > > > > > > > > > configure a >> > > > > > > > > > > > > > > > Serializer >> > > > > > > > > > > > > > > > > > class which we instantiate via >> reflection. >> > We >> > > > > take >> > > > > > > the >> > > > > > > > > > later >> > > > > > > > > > > > > > approach >> > > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > the current producer, and I have >>carried >> > this >> > > > > > through >> > > > > > > > to >> > > > > > > > > > this >> > > > > > > > > > > > > > > > prototype. >> > > > > > > > > > > > > > > > > > The tradeoff I see is this: taking >>byte[] >> > is >> > > > > > actually >> > > > > > > > > > > simpler, >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > user >> > > > > > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > > > directly do whatever serialization >>they >> > like. >> > > > The >> > > > > > > > > > > complication >> > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > actually >> > > > > > > > > > > > > > > > > > partitioning. Currently partitioning >>is >> > done >> > > > by a >> > > > > > > > similar >> > > > > > > > > > > > plug-in >> > > > > > > > > > > > > > api >> > > > > > > > > > > > > > > > > > (Partitioner) which the user can >> implement >> > > and >> > > > > > > > configure >> > > > > > > > > to >> > > > > > > > > > > > > > override >> > > > > > > > > > > > > > > > how >> > > > > > > > > > > > > > > > > > partitions are assigned. If we take >> byte[] >> > as >> > > > > input >> > > > > > > > then >> > > > > > > > > we >> > > > > > > > > > > > have >> > > > > > > > > > > > > no >> > > > > > > > > > > > > > > > > access >> > > > > > > > > > > > > > > > > > to the original object and >>partitioning >> > MUST >> > > be >> > > > > > done >> > > > > > > on >> > > > > > > > > the >> > > > > > > > > > > > > byte[]. >> > > > > > > > > > > > > > > > This >> > > > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > fine for hash partitioning. However >>for >> > > various >> > > > > > types >> > > > > > > > of >> > > > > > > > > > > > semantic >> > > > > > > > > > > > > > > > > > partitioning (range partitioning, or >> > > whatever) >> > > > > you >> > > > > > > > would >> > > > > > > > > > want >> > > > > > > > > > > > > > access >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > original object. In the current >>approach >> a >> > > > > producer >> > > > > > > who >> > > > > > > > > > > wishes >> > > > > > > > > > > > to >> > > > > > > > > > > > > > > send >> > > > > > > > > > > > > > > > > > byte[] they have serialized in their >>own >> > code >> > > > can >> > > > > > > > > configure >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > BytesSerialization we supply which is >> just >> > a >> > > > "no >> > > > > > op" >> > > > > > > > > > > > > serialization. >> > > > > > > > > > > > > > > > > > 2. We should obsess over naming and >>make >> > sure >> > > > > each >> > > > > > of >> > > > > > > > the >> > > > > > > > > > > class >> > > > > > > > > > > > > > names >> > > > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > good. >> > > > > > > > > > > > > > > > > > 3. Jun has already pointed out that we >> need >> > > to >> > > > > > > include >> > > > > > > > > the >> > > > > > > > > > > > topic >> > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > partition in the response, which is >> > > absolutely >> > > > > > > right. I >> > > > > > > > > > > haven't >> > > > > > > > > > > > > > done >> > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > yet but that definitely needs to be >> there. >> > > > > > > > > > > > > > > > > > 4. Currently RecordSend.await will >>throw >> an >> > > > > > exception >> > > > > > > > if >> > > > > > > > > > the >> > > > > > > > > > > > > > request >> > > > > > > > > > > > > > > > > > failed. The intention here is that >> > > > > > > > > > > > producer.send(message).await() >> > > > > > > > > > > > > > > > exactly >> > > > > > > > > > > > > > > > > > simulates a synchronous call. Guozhang >> has >> > > > noted >> > > > > > that >> > > > > > > > > this >> > > > > > > > > > > is a >> > > > > > > > > > > > > > > little >> > > > > > > > > > > > > > > > > > annoying since the user must then >>catch >> > > > > exceptions. >> > > > > > > > > However >> > > > > > > > > > > if >> > > > > > > > > > > > we >> > > > > > > > > > > > > > > > remove >> > > > > > > > > > > > > > > > > > this then if the user doesn't check >>for >> > > errors >> > > > > they >> > > > > > > > won't >> > > > > > > > > > > know >> > > > > > > > > > > > > one >> > > > > > > > > > > > > > > has >> > > > > > > > > > > > > > > > > > occurred, which I predict will be a >> common >> > > > > mistake. >> > > > > > > > > > > > > > > > > > 5. Perhaps there is more we could do >>to >> > make >> > > > the >> > > > > > > async >> > > > > > > > > > > > callbacks >> > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > future >> > > > > > > > > > > > > > > > > > we give back intuitive and easy to >> program >> > > > > against? >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Some background info on >>implementation: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > At a high level the primary >>difference in >> > > this >> > > > > > > producer >> > > > > > > > > is >> > > > > > > > > > > that >> > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > removes >> > > > > > > > > > > > > > > > > > the distinction between the "sync" and >> > > "async" >> > > > > > > > producer. >> > > > > > > > > > > > > > Effectively >> > > > > > > > > > > > > > > > all >> > > > > > > > > > > > > > > > > > requests are sent asynchronously but >> always >> > > > > return >> > > > > > a >> > > > > > > > > future >> > > > > > > > > > > > > > response >> > > > > > > > > > > > > > > > > object >> > > > > > > > > > > > > > > > > > that gives the offset as well as any >> error >> > > that >> > > > > may >> > > > > > > > have >> > > > > > > > > > > > occurred >> > > > > > > > > > > > > > > when >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > request is complete. The batching >>that is >> > > done >> > > > in >> > > > > > the >> > > > > > > > > async >> > > > > > > > > > > > > > producer >> > > > > > > > > > > > > > > > only >> > > > > > > > > > > > > > > > > > today is done whenever possible now. >>This >> > > means >> > > > > > that >> > > > > > > > the >> > > > > > > > > > sync >> > > > > > > > > > > > > > > producer, >> > > > > > > > > > > > > > > > > > under load, can get performance as >>good >> as >> > > the >> > > > > > async >> > > > > > > > > > producer >> > > > > > > > > > > > > > > > > (preliminary >> > > > > > > > > > > > > > > > > > results show the producer getting 1m >> > > > > messages/sec). >> > > > > > > > This >> > > > > > > > > > > works >> > > > > > > > > > > > > > > similar >> > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > group commit in databases but with >> respect >> > to >> > > > the >> > > > > > > > actual >> > > > > > > > > > > > network >> > > > > > > > > > > > > > > > > > transmission--any messages that arrive >> > while >> > > a >> > > > > send >> > > > > > > is >> > > > > > > > in >> > > > > > > > > > > > > progress >> > > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > batched together. It is also possible >>to >> > > > > encourage >> > > > > > > > > batching >> > > > > > > > > > > > even >> > > > > > > > > > > > > > > under >> > > > > > > > > > > > > > > > > low >> > > > > > > > > > > > > > > > > > load to save server resources by >> > introducing >> > > a >> > > > > > delay >> > > > > > > on >> > > > > > > > > the >> > > > > > > > > > > > send >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > allow >> > > > > > > > > > > > > > > > > > more messages to accumulate; this is >>done >> > > using >> > > > > the >> > > > > > > > > > > > > linger.msconfig >> > > > > > > > > > > > > > > > > (this >> > > > > > > > > > > > > > > > > > is similar to Nagle's algorithm in >>TCP). >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > This producer does all network >> > communication >> > > > > > > > > asynchronously >> > > > > > > > > > > and >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > parallel >> > > > > > > > > > > > > > > > > > to all servers so the performance >>penalty >> > for >> > > > > > acks=-1 >> > > > > > > > and >> > > > > > > > > > > > waiting >> > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > > > replication should be much reduced. I >> > haven't >> > > > > done >> > > > > > > much >> > > > > > > > > > > > > > benchmarking >> > > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > > > this yet, though. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > The high level design is described a >> little >> > > > here, >> > > > > > > > though >> > > > > > > > > > this >> > > > > > > > > > > > is >> > > > > > > > > > > > > > now >> > > > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > little out of date: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > >>https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>