This is pretty hard to do with the architecture we've gone with as the
stored events are not objects, but tightly packed serialized bytes. This
approach is much better from a performance and memory management point of
view, though, so I'd be very hesitant to change it. So it is pretty hard to
provide a usable api. I think likely this is something that would be better
implemented in the application (e.g. use a blocking queue and batch into a
single message after the timeout).

-Jay


On Thu, Feb 6, 2014 at 1:16 PM, S Ahmed <sahmed1...@gmail.com> wrote:

> How about the following use case:
>
> Just before the producer actually sends the payload to kakfa, could an
> event be exposed that would allow one to loop through the messages and
> potentially delete some of them?
>
> Example:
>
> Say you have 100 messages, but before you send these messages to kakfa, you
> can easily aggregate many of these messages to reduce the message count.
>  If there are messages that store counts, you could aggregate these into a
> single message and then send to kafka.
>
> Thoughts?
>
>
>
> On Wed, Feb 5, 2014 at 2:03 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > It might. I considered this but ended up going this way. Now that we have
> > changed partitionKey=>partition it almost works. The difference is the
> > consumer gets an offset too which the producer doesn't have.
> >
> > One thing I think this points to is the value of getting the consumer
> java
> > api worked out even in the absence of an implementation just so we can
> > write some fake code that uses both and kind of see how it feels.
> >
> > -Jay
> >
> >
> > On Wed, Feb 5, 2014 at 10:23 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > Currently, the user will send ProducerRecords using the new producer.
> The
> > > expectation will be that you get the same thing as output from the
> > > consumer. Since ProduceRecord is a holder for topic, partition, key and
> > > value, does it make sense to rename it to just Record? So, the
> > send/receive
> > > APIs would look like the following -
> > >
> > > producer.send(Record record);
> > > List<Record> poll();
> > >
> > > Thoughts?
> > >
> > >
> > > On Sun, Feb 2, 2014 at 4:12 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > I think the most common motivate of having a customized partitioner
> is
> > to
> > > > make sure some messages always go to the same partition, but people
> may
> > > > seldom want to know about which partition exactly they go to. If that
> > is
> > > > true, why not just assign the same byte array as partition key with
> the
> > > > default hash based partitioning in option 1.A? But again, that is
> based
> > > on
> > > > my presumption that very few users would want to really specify the
> > > > partition id.
> > > >
> > > >
> > > >
> > > > On Fri, Jan 31, 2014 at 2:44 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Tom,
> > > > >
> > > > > Agreed, there is definitely nothing that prevents our including
> > > > partitioner
> > > > > implementations, but it does get a little less seamless.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Fri, Jan 31, 2014 at 2:35 PM, Tom Brown <tombrow...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Regarding partitioning APIs, I don't think there is not a common
> > > subset
> > > > > of
> > > > > > information that is required for all strategies. Instead of
> > modifying
> > > > the
> > > > > > core API to easily support all of the various partitioning
> > > strategies,
> > > > > > offer the most common ones as libraries they can build into their
> > own
> > > > > data
> > > > > > pipeline, just like serialization. The core API would simply
> > accept a
> > > > > > partition index. You could include one default strategy (random)
> > that
> > > > > only
> > > > > > applies if they set "-1" for the partition index.
> > > > > >
> > > > > > That way, each partitioning strategy could have its own API that
> > > makes
> > > > > > sense for it. For example, a round-robin partitioner only needs
> one
> > > > > method:
> > > > > > "nextPartition()", while a hash-based one needs
> > > > > "getPartitionFor(byte[])".
> > > > > >
> > > > > > For those who actually need a pluggable strategy, a superset of
> the
> > > API
> > > > > > could be codified into an interface (perhaps the existing
> > partitioner
> > > > > > interface), but it would still have to be used from outside of
> the
> > > core
> > > > > > API.
> > > > > >
> > > > > > This design would make the core API less confusing (when do I
> use a
> > > > > > partiton key instead of a partition index, does the key overwrite
> > the
> > > > > > index, can the key be null, etc...?) while still providing the
> > > > > flexibility
> > > > > > you want.
> > > > > >
> > > > > > --Tom
> > > > > >
> > > > > > On Fri, Jan 31, 2014 at 12:07 PM, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Oliver,
> > > > > > >
> > > > > > > Yeah that was my original plan--allow the registration of
> > multiple
> > > > > > > callbacks on the future. But there is some additional
> > > implementation
> > > > > > > complexity because then you need more synchronization variables
> > to
> > > > > ensure
> > > > > > > the callback gets executed even if the request has completed at
> > the
> > > > > time
> > > > > > > the callback is registered. This also makes it unpredictable
> the
> > > > order
> > > > > of
> > > > > > > callback execution--I want to be able to guarantee that for a
> > > > > particular
> > > > > > > partition callbacks for lower offset messages happen before
> > > callbacks
> > > > > for
> > > > > > > higher offset messages so that if you set a highwater mark or
> > > > something
> > > > > > it
> > > > > > > is easy to reason about. This has the added benefit that
> > callbacks
> > > > > > execute
> > > > > > > in the I/O thread ALWAYS instead of it being non-deterministic
> > > which
> > > > > is a
> > > > > > > little confusing.
> > > > > > >
> > > > > > > I thought a single callback is sufficient since you can always
> > > > include
> > > > > > > multiple actions in that callback, and I think that case is
> rare
> > > > > anyway.
> > > > > > >
> > > > > > > I did think about the possibility of adding a thread pool for
> > > > handling
> > > > > > the
> > > > > > > callbacks. But there are a lot of possible configurations for
> > such
> > > a
> > > > > > thread
> > > > > > > pool and a simplistic approach would no longer guarantee
> in-order
> > > > > > > processing of callbacks (you would need to hash execution over
> > > > threads
> > > > > by
> > > > > > > partition id). I think by just exposing the simple method that
> > > > executes
> > > > > > in
> > > > > > > the I/O thread you can easily implement the pooled execution
> > using
> > > > the
> > > > > > > therad pooling mechanism of your choice by just having the
> > callback
> > > > use
> > > > > > an
> > > > > > > executor to run the action (i.e. make an AsyncCallback that
> > takes a
> > > > > > > threadpool and a Runnable or something like that). This gives
> the
> > > > user
> > > > > > full
> > > > > > > control over the executor (there are lots of details around
> > thread
> > > > > re-use
> > > > > > > in executors, thread factories, etc and trying to expose
> configs
> > > for
> > > > > > every
> > > > > > > variation will be a pain). This also makes it totally
> transparent
> > > how
> > > > > it
> > > > > > > works; that is if we did expose all kinds of thread pool
> configs
> > > you
> > > > > > would
> > > > > > > still probably end up reading our code to figure out exactly
> what
> > > > they
> > > > > > all
> > > > > > > did.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jan 31, 2014 at 9:39 AM, Oliver Dain <
> > > > od...@3cinteractive.com
> > > > > > > >wrote:
> > > > > > >
> > > > > > > > Hmmm.. I should read the docs more carefully before I open my
> > big
> > > > > > mouth:
> > > > > > > I
> > > > > > > > just noticed the KafkaProducer#send overload that takes a
> > > callback.
> > > > > > That
> > > > > > > > definitely helps address my concern though I think the API
> > would
> > > be
> > > > > > > > cleaner if there was only one variant that returned a future
> > and
> > > > you
> > > > > > > could
> > > > > > > > register the callback with the future. This is not nearly as
> > > > > important
> > > > > > as
> > > > > > > > I'd thought given the ability to register a callback - just a
> > > > > > preference.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 1/31/14, 9:33 AM, "Oliver Dain" <od...@3cinteractive.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > >Hey all,
> > > > > > > > >
> > > > > > > > >I¹m excited about having a new Producer API, and I really
> like
> > > the
> > > > > > idea
> > > > > > > of
> > > > > > > > >removing the distinction between a synchronous and
> > asynchronous
> > > > > > > producer.
> > > > > > > > >The one comment I have about the current API is that it¹s
> hard
> > > to
> > > > > > write
> > > > > > > > >truly asynchronous code with the type of future returned by
> > the
> > > > send
> > > > > > > > >method. The issue is that send returns a RecordSend and
> > there¹s
> > > no
> > > > > way
> > > > > > > to
> > > > > > > > >register a callback with that object. It is therefore
> > necessary
> > > to
> > > > > > poll
> > > > > > > > >the object periodically to see if the send has completed. So
> > if
> > > > you
> > > > > > > have n
> > > > > > > > >send calls outstanding you have to check n RecordSend
> objects
> > > > which
> > > > > is
> > > > > > > > >slow. In general this tends to lead to people using one
> thread
> > > per
> > > > > > send
> > > > > > > > >call and then calling RecordSend#await which removes much of
> > the
> > > > > > benefit
> > > > > > > > >of an async API.
> > > > > > > > >
> > > > > > > > >I think it¹s much easier to write truly asynchronous code if
> > the
> > > > > > > returned
> > > > > > > > >future allows you to register a callback. That way, instead
> of
> > > > > polling
> > > > > > > you
> > > > > > > > >can simply wait for the callback to be called. A good
> example
> > of
> > > > the
> > > > > > > kind
> > > > > > > > >of thing I¹m thinking is the ListenableFuture class in the
> > Guava
> > > > > > > > >libraries:
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >HTH,
> > > > > > > > >Oliver
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Reply via email to