This is pretty hard to do with the architecture we've gone with as the stored events are not objects, but tightly packed serialized bytes. This approach is much better from a performance and memory management point of view, though, so I'd be very hesitant to change it. So it is pretty hard to provide a usable api. I think likely this is something that would be better implemented in the application (e.g. use a blocking queue and batch into a single message after the timeout).
-Jay On Thu, Feb 6, 2014 at 1:16 PM, S Ahmed <sahmed1...@gmail.com> wrote: > How about the following use case: > > Just before the producer actually sends the payload to kakfa, could an > event be exposed that would allow one to loop through the messages and > potentially delete some of them? > > Example: > > Say you have 100 messages, but before you send these messages to kakfa, you > can easily aggregate many of these messages to reduce the message count. > If there are messages that store counts, you could aggregate these into a > single message and then send to kafka. > > Thoughts? > > > > On Wed, Feb 5, 2014 at 2:03 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > It might. I considered this but ended up going this way. Now that we have > > changed partitionKey=>partition it almost works. The difference is the > > consumer gets an offset too which the producer doesn't have. > > > > One thing I think this points to is the value of getting the consumer > java > > api worked out even in the absence of an implementation just so we can > > write some fake code that uses both and kind of see how it feels. > > > > -Jay > > > > > > On Wed, Feb 5, 2014 at 10:23 AM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > Currently, the user will send ProducerRecords using the new producer. > The > > > expectation will be that you get the same thing as output from the > > > consumer. Since ProduceRecord is a holder for topic, partition, key and > > > value, does it make sense to rename it to just Record? So, the > > send/receive > > > APIs would look like the following - > > > > > > producer.send(Record record); > > > List<Record> poll(); > > > > > > Thoughts? > > > > > > > > > On Sun, Feb 2, 2014 at 4:12 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > I think the most common motivate of having a customized partitioner > is > > to > > > > make sure some messages always go to the same partition, but people > may > > > > seldom want to know about which partition exactly they go to. If that > > is > > > > true, why not just assign the same byte array as partition key with > the > > > > default hash based partitioning in option 1.A? But again, that is > based > > > on > > > > my presumption that very few users would want to really specify the > > > > partition id. > > > > > > > > > > > > > > > > On Fri, Jan 31, 2014 at 2:44 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Hey Tom, > > > > > > > > > > Agreed, there is definitely nothing that prevents our including > > > > partitioner > > > > > implementations, but it does get a little less seamless. > > > > > > > > > > -Jay > > > > > > > > > > > > > > > On Fri, Jan 31, 2014 at 2:35 PM, Tom Brown <tombrow...@gmail.com> > > > wrote: > > > > > > > > > > > Regarding partitioning APIs, I don't think there is not a common > > > subset > > > > > of > > > > > > information that is required for all strategies. Instead of > > modifying > > > > the > > > > > > core API to easily support all of the various partitioning > > > strategies, > > > > > > offer the most common ones as libraries they can build into their > > own > > > > > data > > > > > > pipeline, just like serialization. The core API would simply > > accept a > > > > > > partition index. You could include one default strategy (random) > > that > > > > > only > > > > > > applies if they set "-1" for the partition index. > > > > > > > > > > > > That way, each partitioning strategy could have its own API that > > > makes > > > > > > sense for it. For example, a round-robin partitioner only needs > one > > > > > method: > > > > > > "nextPartition()", while a hash-based one needs > > > > > "getPartitionFor(byte[])". > > > > > > > > > > > > For those who actually need a pluggable strategy, a superset of > the > > > API > > > > > > could be codified into an interface (perhaps the existing > > partitioner > > > > > > interface), but it would still have to be used from outside of > the > > > core > > > > > > API. > > > > > > > > > > > > This design would make the core API less confusing (when do I > use a > > > > > > partiton key instead of a partition index, does the key overwrite > > the > > > > > > index, can the key be null, etc...?) while still providing the > > > > > flexibility > > > > > > you want. > > > > > > > > > > > > --Tom > > > > > > > > > > > > On Fri, Jan 31, 2014 at 12:07 PM, Jay Kreps <jay.kr...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Oliver, > > > > > > > > > > > > > > Yeah that was my original plan--allow the registration of > > multiple > > > > > > > callbacks on the future. But there is some additional > > > implementation > > > > > > > complexity because then you need more synchronization variables > > to > > > > > ensure > > > > > > > the callback gets executed even if the request has completed at > > the > > > > > time > > > > > > > the callback is registered. This also makes it unpredictable > the > > > > order > > > > > of > > > > > > > callback execution--I want to be able to guarantee that for a > > > > > particular > > > > > > > partition callbacks for lower offset messages happen before > > > callbacks > > > > > for > > > > > > > higher offset messages so that if you set a highwater mark or > > > > something > > > > > > it > > > > > > > is easy to reason about. This has the added benefit that > > callbacks > > > > > > execute > > > > > > > in the I/O thread ALWAYS instead of it being non-deterministic > > > which > > > > > is a > > > > > > > little confusing. > > > > > > > > > > > > > > I thought a single callback is sufficient since you can always > > > > include > > > > > > > multiple actions in that callback, and I think that case is > rare > > > > > anyway. > > > > > > > > > > > > > > I did think about the possibility of adding a thread pool for > > > > handling > > > > > > the > > > > > > > callbacks. But there are a lot of possible configurations for > > such > > > a > > > > > > thread > > > > > > > pool and a simplistic approach would no longer guarantee > in-order > > > > > > > processing of callbacks (you would need to hash execution over > > > > threads > > > > > by > > > > > > > partition id). I think by just exposing the simple method that > > > > executes > > > > > > in > > > > > > > the I/O thread you can easily implement the pooled execution > > using > > > > the > > > > > > > therad pooling mechanism of your choice by just having the > > callback > > > > use > > > > > > an > > > > > > > executor to run the action (i.e. make an AsyncCallback that > > takes a > > > > > > > threadpool and a Runnable or something like that). This gives > the > > > > user > > > > > > full > > > > > > > control over the executor (there are lots of details around > > thread > > > > > re-use > > > > > > > in executors, thread factories, etc and trying to expose > configs > > > for > > > > > > every > > > > > > > variation will be a pain). This also makes it totally > transparent > > > how > > > > > it > > > > > > > works; that is if we did expose all kinds of thread pool > configs > > > you > > > > > > would > > > > > > > still probably end up reading our code to figure out exactly > what > > > > they > > > > > > all > > > > > > > did. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 31, 2014 at 9:39 AM, Oliver Dain < > > > > od...@3cinteractive.com > > > > > > > >wrote: > > > > > > > > > > > > > > > Hmmm.. I should read the docs more carefully before I open my > > big > > > > > > mouth: > > > > > > > I > > > > > > > > just noticed the KafkaProducer#send overload that takes a > > > callback. > > > > > > That > > > > > > > > definitely helps address my concern though I think the API > > would > > > be > > > > > > > > cleaner if there was only one variant that returned a future > > and > > > > you > > > > > > > could > > > > > > > > register the callback with the future. This is not nearly as > > > > > important > > > > > > as > > > > > > > > I'd thought given the ability to register a callback - just a > > > > > > preference. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 1/31/14, 9:33 AM, "Oliver Dain" <od...@3cinteractive.com> > > > > wrote: > > > > > > > > > > > > > > > > >Hey all, > > > > > > > > > > > > > > > > > >I¹m excited about having a new Producer API, and I really > like > > > the > > > > > > idea > > > > > > > of > > > > > > > > >removing the distinction between a synchronous and > > asynchronous > > > > > > > producer. > > > > > > > > >The one comment I have about the current API is that it¹s > hard > > > to > > > > > > write > > > > > > > > >truly asynchronous code with the type of future returned by > > the > > > > send > > > > > > > > >method. The issue is that send returns a RecordSend and > > there¹s > > > no > > > > > way > > > > > > > to > > > > > > > > >register a callback with that object. It is therefore > > necessary > > > to > > > > > > poll > > > > > > > > >the object periodically to see if the send has completed. So > > if > > > > you > > > > > > > have n > > > > > > > > >send calls outstanding you have to check n RecordSend > objects > > > > which > > > > > is > > > > > > > > >slow. In general this tends to lead to people using one > thread > > > per > > > > > > send > > > > > > > > >call and then calling RecordSend#await which removes much of > > the > > > > > > benefit > > > > > > > > >of an async API. > > > > > > > > > > > > > > > > > >I think it¹s much easier to write truly asynchronous code if > > the > > > > > > > returned > > > > > > > > >future allows you to register a callback. That way, instead > of > > > > > polling > > > > > > > you > > > > > > > > >can simply wait for the callback to be called. A good > example > > of > > > > the > > > > > > > kind > > > > > > > > >of thing I¹m thinking is the ListenableFuture class in the > > Guava > > > > > > > > >libraries: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained > > > > > > > > > > > > > > > > > > > > > > > > > > >HTH, > > > > > > > > >Oliver > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > >