Jay,

I think you're confused between my use of "basic client" and "connection".
There is one basic client for a cluster. An IO thread manages the tcp
connections for any number of brokers. The basic client has a queue of
requests each broker. When a tcp connection (associated with broker X) is
ready to send the next request, it asks the basic client for the next
request for broker X.

The producer is just a layer that maps partitions to brokers so you only
have to tell it to send to partiton #3, and it knows that partition #3 goes
to broker X, and adds a produce request to the queue for broker X.

Conceivably (though I haven't implemented it yet), a multi-produce request
could be used in the same way. Since request pipelining is in place, I
don't see a good reason to use multi-produce.

Did I clear it up any, or is this just more confusing?

--Tom





On Wed, Jan 29, 2014 at 11:00 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Tom,
>
> So is there one connection and I/O thread per broker and a low-level client
> for each of those, and then you hash into that to partition? Is it possible
> to batch across partitions or only within a partition?
>
> -Jay
>
>
> On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown <tombrow...@gmail.com> wrote:
>
> > Jay,
> >
> > There is both a basic client object, and a number of IO processing
> threads.
> > The client object manages connections, creating new ones when new
> machines
> > are connected, or when existing connections die. It also manages a queue
> of
> > requests for each server. The IO processing thread has a selector, and
> > performs the work of sending/receiving (removing items from the queue,
> > interpreting the response at a basic level, etc). Since asynchronous
> > sockets by nature decouple sending and receiving, request pipelining is
> > inherent.
> >
> > Using the basic client, you can send individual produce requests
> (singular
> > or batched). The "producer" layer adds an additional queue for each
> > partition, allowing individual messages to be batched together.
> >
> > --Tom
> >
> >
> >
> > On Tue, Jan 28, 2014 at 9:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > Hey Neha,
> > >
> > > Can you elaborate on why you prefer using Java's Future? The downside
> in
> > my
> > > mind is the use of the checked InterruptedException and
> > ExecutionException.
> > > ExecutionException is arguable, but forcing you to catch
> > > InterruptedException, often in code that can't be interrupted, seems
> > > perverse. It also leaves us with the cancel() method which I don't
> think
> > we
> > > really can implement.
> > >
> > > Option 1A, to recap/elaborate, was the following. There is no
> Serializer
> > or
> > > Partitioner api. We take a byte[] key and value and an optional integer
> > > partition. If you specify the integer partition it will be used. If you
> > do
> > > not specify a key or a partition the partition will be chosen in a
> round
> > > robin fashion. If you specify a key but no partition we will chose a
> > > partition based on a hash of the key. In order to let the user find the
> > > partition we will need to given them access to the Cluster instance
> > > directly from the producer.
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > > >wrote:
> > >
> > > > Here are more thoughts on the public APIs -
> > > >
> > > > - I suggest we use java's Future instead of custom Future especially
> > > since
> > > > it is part of the public API
> > > >
> > > > - Serialization: I like the simplicity of the producer APIs with the
> > > > absence of serialization where we just deal with byte arrays for keys
> > and
> > > > values. What I don't like about this is the performance overhead on
> the
> > > > Partitioner for any kind of custom partitioning based on the
> > > partitionKey.
> > > > Since the only purpose of partitionKey is to do custom partitioning,
> > why
> > > > can't we take it in directly as an integer and let the user figure
> out
> > > the
> > > > mapping from partition_key -> partition_id using the getCluster()
> API?
> > > If I
> > > > understand correctly, this is similar to what you suggested as part
> of
> > > > option 1A. I like this approach since it maintains the simplicity of
> > APIs
> > > > by allowing us to deal with bytes and does not compromise performance
> > in
> > > > the custom partitioning case.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > >
> > > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Tom,
> > > > >
> > > > > That sounds cool. How did you end up handling parallel I/O if you
> > wrap
> > > > the
> > > > > individual connections? Don't you need some selector that selects
> > over
> > > > all
> > > > > the connections?
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown <tombrow...@gmail.com>
> > > wrote:
> > > > >
> > > > > > I implemented a 0.7 client in pure java, and its API very closely
> > > > > resembled
> > > > > > this. (When multiple people independently engineer the same
> > solution,
> > > > > it's
> > > > > > probably good... right?). However, there were a few architectural
> > > > > > differences with my client:
> > > > > >
> > > > > > 1. The basic client itself was just an asynchronous layer around
> > the
> > > > > > different server functions. In and of itself it had no knowledge
> of
> > > > > > partitions, only servers (and maintained TCP connections to
> them).
> > > > > >
> > > > > > 2. The main producer was an additional layer that provided a
> > > high-level
> > > > > > interface that could batch individual messages based on
> partition.
> > > > > >
> > > > > > 3. Knowledge of partitioning was done via an interface so that
> > > > different
> > > > > > strategies could be used.
> > > > > >
> > > > > > 4. Partitioning was done by the user, with knowledge of the
> > available
> > > > > > partitions provided by #3.
> > > > > >
> > > > > > 5. Serialization was done by the user to simplify the API.
> > > > > >
> > > > > > 6. Futures were used to make asynchronous emulate synchronous
> > calls.
> > > > > >
> > > > > >
> > > > > > The main benefit of this approach is flexibility. For example,
> > since
> > > > the
> > > > > > base client was just a managed connection (and not inherently a
> > > > > producer),
> > > > > > it was easy to composite a produce request and an offsets request
> > > > > together
> > > > > > into a confirmed produce request (officially not available in
> 0.7).
> > > > > >
> > > > > > Decoupling the basic client from partition management allowed the
> > me
> > > to
> > > > > > implement zk discovery as a separate project so that the main
> > project
> > > > had
> > > > > > no complex dependencies. The same was true of decoupling
> > > serialization.
> > > > > > It's trivial to build an optional layer that adds those features
> > in,
> > > > > while
> > > > > > allowing access to the base APIs for those that need it.
> > > > > >
> > > > > > Using standard Future objects was also beneficial, since I could
> > > > combine
> > > > > > them with existing tools (such as guava).
> > > > > >
> > > > > > It may be too late to be of use, but I have been working with my
> > > > > company's
> > > > > > legal department to release the implementation I described above.
> > If
> > > > > you're
> > > > > > interested in it, let me know.
> > > > > >
> > > > > >
> > > > > > To sum up my thoughts regarding the new API, I think it's a great
> > > > start.
> > > > > I
> > > > > > would like to see a more layered approach so I can use the parts
> I
> > > > want,
> > > > > > and adapt the other parts as needed. I would also like to see
> > > standard
> > > > > > interfaces (especially Future) used where they makes sense.
> > > > > >
> > > > > > --Tom
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 28, 2014 at 1:33 PM, Roger Hoover <
> > > roger.hoo...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > +1 ListenableFuture: If this works similar to Deferreds in
> > Twisted
> > > > > Python
> > > > > > > or Promised IO in Javascript, I think this is a great pattern
> for
> > > > > > > decoupling your callback logic from the place where the Future
> is
> > > > > > > generated.  You can register as many callbacks as you like,
> each
> > in
> > > > the
> > > > > > > appropriate layer of the code and have each observer get
> notified
> > > > when
> > > > > > the
> > > > > > > promised i/o is complete without any of them knowing about each
> > > > other.
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps <
> jay.kr...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey Ross,
> > > > > > > >
> > > > > > > > - ListenableFuture: Interesting. That would be an alternative
> > to
> > > > the
> > > > > > > direct
> > > > > > > > callback support we provide. There could be pros to this, let
> > me
> > > > > think
> > > > > > > > about it.
> > > > > > > > - We could provide layering, but I feel that the
> serialization
> > is
> > > > > such
> > > > > > a
> > > > > > > > small thing we should just make a decision and chose one, it
> > > > doesn't
> > > > > > seem
> > > > > > > > to me to justify a whole public facing layer.
> > > > > > > > - Yes, this is fairly esoteric, essentially I think it is
> > fairly
> > > > > > similar
> > > > > > > to
> > > > > > > > databases like DynamoDB that allow you to specify two
> partition
> > > > keys
> > > > > (I
> > > > > > > > think DynamoDB does this...). The reasoning is that in fact
> > there
> > > > are
> > > > > > > > several things you can use the key field for: (1) to compute
> > the
> > > > > > > partition
> > > > > > > > to store the data in, (2) as a unique identifier to
> deduplicate
> > > > that
> > > > > > > > partition's records within a log. These two things are almost
> > > > always
> > > > > > the
> > > > > > > > same, but occationally may differ when you want to group data
> > in
> > > a
> > > > > more
> > > > > > > > sophisticated way then just a hash of the primary key but
> still
> > > > > retain
> > > > > > > the
> > > > > > > > proper primary key for delivery to the consumer and log
> > > compaction.
> > > > > > > >
> > > > > > > > -Jay
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jan 28, 2014 at 3:24 AM, Ross Black <
> > > > ross.w.bl...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jay,
> > > > > > > > >
> > > > > > > > > - Just to add some more info/confusion about possibly using
> > > > Future
> > > > > > ...
> > > > > > > > >   If Kafka uses a JDK future, it plays nicely with other
> > > > frameworks
> > > > > > as
> > > > > > > > > well.
> > > > > > > > >   Google Guava has a ListenableFuture that allows callback
> > > > handling
> > > > > > to
> > > > > > > be
> > > > > > > > > added via the returned future, and allows the callbacks to
> be
> > > > > passed
> > > > > > > off
> > > > > > > > to
> > > > > > > > > a specified executor.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html
> > > > > > > > >   The JDK future can easily be converted to a listenable
> > > future.
> > > > > > > > >
> > > > > > > > > - On the question of byte[] vs Object, could this be solved
> > by
> > > > > > layering
> > > > > > > > the
> > > > > > > > > API?  eg. a raw producer (use byte[] and specify the
> > partition
> > > > > > number)
> > > > > > > > and
> > > > > > > > > a normal producer (use generic object and specify a
> > > Partitioner)?
> > > > > > > > >
> > > > > > > > > - I am confused by the keys in ProducerRecord and
> > Partitioner.
> > > > >  What
> > > > > > is
> > > > > > > > the
> > > > > > > > > usage for both a key and a partition key? (I am not yet
> using
> > > > 0.8)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Ross
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 28 January 2014 05:00, Xavier Stevens <
> xav...@gaikai.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > AutoCloseable would be nice for us as most of our code is
> > > using
> > > > > > Java
> > > > > > > 7
> > > > > > > > at
> > > > > > > > > > this point.
> > > > > > > > > >
> > > > > > > > > > I like Dropwizard's configuration mapping to POJOs via
> > > Jackson,
> > > > > but
> > > > > > > if
> > > > > > > > > you
> > > > > > > > > > wanted to stick with property maps I don't care enough to
> > > > object.
> > > > > > > > > >
> > > > > > > > > > If the producer only dealt with bytes, is there a way we
> > > could
> > > > > > still
> > > > > > > > due
> > > > > > > > > > partition plugins without specifying the number
> > explicitly? I
> > > > > would
> > > > > > > > > prefer
> > > > > > > > > > to be able to pass in field(s) that would be used by the
> > > > > > partitioner.
> > > > > > > > > > Obviously if this wasn't possible you could always
> > > deserialize
> > > > > the
> > > > > > > > object
> > > > > > > > > > in the partitioner and grab the fields you want, but that
> > > seems
> > > > > > > really
> > > > > > > > > > expensive to do on every message.
> > > > > > > > > >
> > > > > > > > > > It would also be nice to have a Java API Encoder
> > constructor
> > > > > taking
> > > > > > > in
> > > > > > > > > > VerifiableProperties. Scala understands how to handle
> > "props:
> > > > > > > > > > VerifiableProperties = null", but Java doesn't. So you
> > don't
> > > > run
> > > > > > into
> > > > > > > > > this
> > > > > > > > > > problem until runtime.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > -Xavier
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman <
> > > > > cl...@breyman.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Jay -
> > > > > > > > > > >
> > > > > > > > > > > Config - your explanation makes sense. I'm just so
> > > accustomed
> > > > > to
> > > > > > > > having
> > > > > > > > > > > Jackson automatically map my configuration objects to
> > POJOs
> > > > > that
> > > > > > > I've
> > > > > > > > > > > stopped using property files. They are lingua franca.
> The
> > > > only
> > > > > > > > thought
> > > > > > > > > > > might be to separate the config interface from the
> > > > > implementation
> > > > > > > to
> > > > > > > > > > allow
> > > > > > > > > > > for alternatives, but that might undermine your point
> of
> > > "do
> > > > it
> > > > > > > this
> > > > > > > > > way
> > > > > > > > > > so
> > > > > > > > > > > that everyone can find it where they expect it".
> > > > > > > > > > >
> > > > > > > > > > > Serialization: Of the options, I like 1A the best,
> though
> > > > > > possibly
> > > > > > > > with
> > > > > > > > > > > either an option to specify a partition key rather than
> > ID
> > > > or a
> > > > > > > > helper
> > > > > > > > > to
> > > > > > > > > > > translate an arbitrary byte[] or long into a partition
> > > > number.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > > Clark
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps <
> > > > > jay.kr...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for the detailed thoughts. Let me elaborate on
> > the
> > > > > > config
> > > > > > > > > thing.
> > > > > > > > > > > >
> > > > > > > > > > > > I agree that at first glance key-value strings don't
> > seem
> > > > > like
> > > > > > a
> > > > > > > > very
> > > > > > > > > > > good
> > > > > > > > > > > > configuration api for a client. Surely a well-typed
> > > config
> > > > > > class
> > > > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > better! I actually disagree and let me see if I can
> > > > convince
> > > > > > you.
> > > > > > > > > > > >
> > > > > > > > > > > > My reasoning has nothing to do with the api and
> > > everything
> > > > to
> > > > > > do
> > > > > > > > with
> > > > > > > > > > > > operations.
> > > > > > > > > > > >
> > > > > > > > > > > > Clients are embedded in applications which are
> > themselves
> > > > > > > > configured.
> > > > > > > > > > In
> > > > > > > > > > > > any place that takes operations seriously the
> > > configuration
> > > > > for
> > > > > > > > these
> > > > > > > > > > > > applications will be version controlled and
> maintained
> > > > > through
> > > > > > > some
> > > > > > > > > > kind
> > > > > > > > > > > of
> > > > > > > > > > > > config management system. If we give a config class
> > with
> > > > > > getters
> > > > > > > > and
> > > > > > > > > > > > setters the application has to expose those
> properties
> > to
> > > > its
> > > > > > > > > > > > configuration. What invariably happens is that the
> > > > > application
> > > > > > > > > exposes
> > > > > > > > > > > only
> > > > > > > > > > > > a choice few properties that they thought they would
> > > > change.
> > > > > > > > > > Furthermore
> > > > > > > > > > > > the application will make up a name for these configs
> > > that
> > > > > > seems
> > > > > > > > > > > intuitive
> > > > > > > > > > > > at the time in the 2 seconds the engineer spends
> > thinking
> > > > > about
> > > > > > > it.
> > > > > > > > > > > >
> > > > > > > > > > > > Now consider the result of this in the large. You end
> > up
> > > > with
> > > > > > > > dozens
> > > > > > > > > or
> > > > > > > > > > > > hundreds of applications that have the client
> embedded.
> > > > Each
> > > > > > > > exposes
> > > > > > > > > a
> > > > > > > > > > > > different, inadequate subset of the possible configs,
> > > each
> > > > > with
> > > > > > > > > > different
> > > > > > > > > > > > names. It is a nightmare.
> > > > > > > > > > > >
> > > > > > > > > > > > If you use a string-string map the config system can
> > > > directly
> > > > > > > get a
> > > > > > > > > > > bundle
> > > > > > > > > > > > of config key-value pairs and put them into the
> client.
> > > > This
> > > > > > > means
> > > > > > > > > that
> > > > > > > > > > > all
> > > > > > > > > > > > configuration is automatically available with the
> name
> > > > > > documented
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > > website in every application that does this. If you
> > > upgrade
> > > > > to
> > > > > > a
> > > > > > > > new
> > > > > > > > > > > kafka
> > > > > > > > > > > > version with more configs those will be exposed too.
> If
> > > you
> > > > > > > realize
> > > > > > > > > > that
> > > > > > > > > > > > you need to change a default you can just go through
> > your
> > > > > > configs
> > > > > > > > and
> > > > > > > > > > > > change it everywhere as it will have the same name
> > > > > everywhere.
> > > > > > > > > > > >
> > > > > > > > > > > > -Jay
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman <
> > > > > > > cl...@breyman.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks Jay. I'll see if I can put together a more
> > > > complete
> > > > > > > > > response,
> > > > > > > > > > > > > perhaps as separate threads so that topics don't
> get
> > > > > > entangled.
> > > > > > > > In
> > > > > > > > > > the
> > > > > > > > > > > > mean
> > > > > > > > > > > > > time, here's a couple responses:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Serialization: you've broken out a sub-thread so
> i'll
> > > > reply
> > > > > > > > there.
> > > > > > > > > My
> > > > > > > > > > > > bias
> > > > > > > > > > > > > is that I like generics (except for type-erasure)
> and
> > > in
> > > > > > > > particular
> > > > > > > > > > > they
> > > > > > > > > > > > > make it easy to compose serializers for compound
> > > payloads
> > > > > > (e.g.
> > > > > > > > > when
> > > > > > > > > > a
> > > > > > > > > > > > > common header wraps a payload of parameterized
> type).
> > > > I'll
> > > > > > > > respond
> > > > > > > > > to
> > > > > > > > > > > > your
> > > > > > > > > > > > > 4-options message with an example.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Build: I've seen a lot of "maven-compatible" build
> > > > systems
> > > > > > > > produce
> > > > > > > > > > > > > "artifacts" that aren't really artifacts - no
> > embedded
> > > > POM
> > > > > > or,
> > > > > > > > > worst,
> > > > > > > > > > > > > malformed POM. I know the sbt-generated artifacts
> > were
> > > > this
> > > > > > > way -
> > > > > > > > > > onus
> > > > > > > > > > > is
> > > > > > > > > > > > > on me to see what gradle is spitting out and what a
> > > maven
> > > > > > build
> > > > > > > > > might
> > > > > > > > > > > > look
> > > > > > > > > > > > > like. Maven may be old and boring, but it gets out
> of
> > > the
> > > > > way
> > > > > > > and
> > > > > > > > > > > > > integrates really seamlessly with a lot of IDEs.
> When
> > > > some
> > > > > > > scala
> > > > > > > > > > > > projects I
> > > > > > > > > > > > > was working on in the fall of 2011 switched from
> sbt
> > to
> > > > > > maven,
> > > > > > > > > build
> > > > > > > > > > > > became
> > > > > > > > > > > > > a non-issue.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Config: Not a big deal  and no, I don't think a
> > > > dropwizard
> > > > > > > > > dependency
> > > > > > > > > > > is
> > > > > > > > > > > > > appropriate. I do like using simple entity beans
> > > (POJO's
> > > > > not
> > > > > > > > j2EE)
> > > > > > > > > > for
> > > > > > > > > > > > > configuration, especially if they can be marshalled
> > > > without
> > > > > > > > > > annotation
> > > > > > > > > > > by
> > > > > > > > > > > > > Jackson. I only mentioned the dropwizard-extras
> > >  because
> > > > it
> > > > > > has
> > > > > > > > > some
> > > > > > > > > > > > entity
> > > > > > > > > > > > > bean versions of the ZK and Kafka configs.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Domain-packaging: Also not a big deal - it's what's
> > > > > expected
> > > > > > > and
> > > > > > > > > it's
> > > > > > > > > > > > > pretty free in most IDE's. The advantages I see is
> > that
> > > > it
> > > > > is
> > > > > > > > clear
> > > > > > > > > > > > whether
> > > > > > > > > > > > > something is from the Apache Kafka project and
> > whether
> > > > > > > something
> > > > > > > > is
> > > > > > > > > > > from
> > > > > > > > > > > > > another org and related to Kafka. That said,
> nothing
> > > > really
> > > > > > > > > enforces
> > > > > > > > > > > it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Futures: I'll see if I can create some examples to
> > > > > > demonstrate
> > > > > > > > > Future
> > > > > > > > > > > > > making interop easier.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > C
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps <
> > > > > > > jay.kr...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hey Clark,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Serialization: Yes I agree with these though I
> > > don't
> > > > > > > consider
> > > > > > > > > the
> > > > > > > > > > > > loss
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > generics a big issue. I'll try to summarize what
> I
> > > > would
> > > > > > > > consider
> > > > > > > > > > the
> > > > > > > > > > > > > best
> > > > > > > > > > > > > > alternative api with raw byte[].
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Maven: We had this debate a few months back and
> > the
> > > > > > > consensus
> > > > > > > > > was
> > > > > > > > > > > > > gradle.
> > > > > > > > > > > > > > Is there a specific issue with the poms gradle
> > > makes? I
> > > > > am
> > > > > > > > > > extremely
> > > > > > > > > > > > > loath
> > > > > > > > > > > > > > to revisit the issue as build issues are a
> > recurring
> > > > > thing
> > > > > > > and
> > > > > > > > no
> > > > > > > > > > one
> > > > > > > > > > > > > ever
> > > > > > > > > > > > > > agrees and ultimately our build needs are very
> > > simple.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Config: I'm not sure if I follow the point. Are
> > you
> > > > > > saying
> > > > > > > we
> > > > > > > > > > > should
> > > > > > > > > > > > > use
> > > > > > > > > > > > > > something in dropwizard for config? One principle
> > > here
> > > > is
> > > > > > to
> > > > > > > > try
> > > > > > > > > to
> > > > > > > > > > > > > remove
> > > > > > > > > > > > > > as many client dependencies as possible as we
> > > > inevitably
> > > > > > run
> > > > > > > > into
> > > > > > > > > > > > > terrible
> > > > > > > > > > > > > > compatibility issues with users who use the same
> > > > library
> > > > > or
> > > > > > > its
> > > > > > > > > > > > > > dependencies at different versions. Or are you
> > > talking
> > > > > > about
> > > > > > > > > > > > maintaining
> > > > > > > > > > > > > > compatibility with existing config parameters? I
> > > think
> > > > as
> > > > > > > much
> > > > > > > > > as a
> > > > > > > > > > > > > config
> > > > > > > > > > > > > > in the existing client makes sense it should have
> > the
> > > > > same
> > > > > > > name
> > > > > > > > > (I
> > > > > > > > > > > was
> > > > > > > > > > > > a
> > > > > > > > > > > > > > bit sloppy about that so I'll fix any errors
> > there).
> > > > > There
> > > > > > > are
> > > > > > > > a
> > > > > > > > > > few
> > > > > > > > > > > > new
> > > > > > > > > > > > > > things and we should give those reasonable
> > defaults.
> > > I
> > > > > > think
> > > > > > > > > config
> > > > > > > > > > > is
> > > > > > > > > > > > > > important so I'll start a thread on the config
> > > package
> > > > in
> > > > > > > > there.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - org.apache.kafka: We could do this. I always
> > > > considered
> > > > > > it
> > > > > > > > kind
> > > > > > > > > > of
> > > > > > > > > > > an
> > > > > > > > > > > > > odd
> > > > > > > > > > > > > > thing Java programmers do that has no real
> > motivation
> > > > > (but
> > > > > > I
> > > > > > > > > could
> > > > > > > > > > be
> > > > > > > > > > > > > > re-educated!). I don't think it ends up reducing
> > > naming
> > > > > > > > conflicts
> > > > > > > > > > in
> > > > > > > > > > > > > > practice and it adds a lot of noise and nested
> > > > > directories.
> > > > > > > Is
> > > > > > > > > > there
> > > > > > > > > > > a
> > > > > > > > > > > > > > reason you prefer this or just to be more
> standard?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Future: Basically I didn't see any particular
> > > > > advantage.
> > > > > > > The
> > > > > > > > > > > cancel()
> > > > > > > > > > > > > > method doesn't really make sense so probably
> > wouldn't
> > > > > work.
> > > > > > > > > > Likewise
> > > > > > > > > > > I
> > > > > > > > > > > > > > dislike the checked exceptions it requires.
> > > Basically I
> > > > > > just
> > > > > > > > > wrote
> > > > > > > > > > > out
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > code examples and it seemed cleaner with a
> special
> > > > > purpose
> > > > > > > > > object.
> > > > > > > > > > I
> > > > > > > > > > > > > wasn't
> > > > > > > > > > > > > > actually aware of plans for improved futures in
> > java
> > > 8
> > > > or
> > > > > > the
> > > > > > > > > other
> > > > > > > > > > > > > > integrations. Maybe you could elaborate on this a
> > bit
> > > > and
> > > > > > > show
> > > > > > > > > how
> > > > > > > > > > it
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > be used? Sounds promising, I just don't know a
> lot
> > > > about
> > > > > > it.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Jay
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <
> > > > > > > > > cl...@breyman.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Jay - Thanks for the call for comments. Here's
> > some
> > > > > > initial
> > > > > > > > > > input:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - Make message serialization a client
> > > responsibility
> > > > > > > (making
> > > > > > > > > all
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > byte[]). Reflection-based loading makes it
> harder
> > > to
> > > > > use
> > > > > > > > > generic
> > > > > > > > > > > > codecs
> > > > > > > > > > > > > > > (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build
> > up
> > > > > codec
> > > > > > > > > > > > > > programmatically.
> > > > > > > > > > > > > > > Non-default partitioning should require an
> > explicit
> > > > > > > partition
> > > > > > > > > > key.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - I really like the fact that it will be native
> > > Java.
> > > > > > > Please
> > > > > > > > > > > consider
> > > > > > > > > > > > > > using
> > > > > > > > > > > > > > > native maven and not sbt, gradle, ivy, etc as
> > they
> > > > > don't
> > > > > > > > > reliably
> > > > > > > > > > > > play
> > > > > > > > > > > > > > nice
> > > > > > > > > > > > > > > in the maven ecosystem. A jar without a
> > well-formed
> > > > pom
> > > > > > > > doesn't
> > > > > > > > > > > feel
> > > > > > > > > > > > > > like a
> > > > > > > > > > > > > > > real artifact. The pom's generated by sbt et
> al.
> > > are
> > > > > not
> > > > > > > well
> > > > > > > > > > > formed.
> > > > > > > > > > > > > > Using
> > > > > > > > > > > > > > > maven will make builds and IDE integration much
> > > > > smoother.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - Look at Nick Telford's dropwizard-extras
> > package
> > > in
> > > > > > which
> > > > > > > > he
> > > > > > > > > > > > defines
> > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > Jackson-compatible POJO's for loading
> > > configuration.
> > > > > > Seems
> > > > > > > > like
> > > > > > > > > > > your
> > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > migration is similar. The config objects should
> > > have
> > > > > > > > > constructors
> > > > > > > > > > > or
> > > > > > > > > > > > > > > factories that accept Map<String, String> and
> > > > > Properties
> > > > > > > for
> > > > > > > > > ease
> > > > > > > > > > > of
> > > > > > > > > > > > > > > migration.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - Would you consider using the org.apache.kafka
> > > > package
> > > > > > for
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > > API
> > > > > > > > > > > > > > > (quibble)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > - Why create your own futures rather than use
> > > > > > > > > > > > > > > java.util.concurrent.Future<Long> or similar?
> > > > Standard
> > > > > > > > futures
> > > > > > > > > > will
> > > > > > > > > > > > > play
> > > > > > > > > > > > > > > nice with other reactive libs and things like
> > J8's
> > > > > > > > > > > ComposableFuture.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks again,
> > > > > > > > > > > > > > > C
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <
> > > > > > > > > > > > roger.hoo...@gmail.com
> > > > > > > > > > > > > > > >wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > A couple comments:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1) Why does the config use a broker list
> > instead
> > > of
> > > > > > > > > discovering
> > > > > > > > > > > the
> > > > > > > > > > > > > > > brokers
> > > > > > > > > > > > > > > > in ZooKeeper?  It doesn't match the
> > > > HighLevelConsumer
> > > > > > > API.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2) It looks like broker connections are
> created
> > > on
> > > > > > > demand.
> > > > > > > > >  I'm
> > > > > > > > > > > > > > wondering
> > > > > > > > > > > > > > > > if sometimes you might want to flush out
> config
> > > or
> > > > > > > network
> > > > > > > > > > > > > connectivity
> > > > > > > > > > > > > > > > issues before pushing the first message
> > through.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Should there also be a
> KafkaProducer.connect()
> > or
> > > > > > .open()
> > > > > > > > > > method
> > > > > > > > > > > or
> > > > > > > > > > > > > > > > connectAll()?  I guess it would try to
> connect
> > to
> > > > all
> > > > > > > > brokers
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > BROKER_LIST_CONFIG
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > HTH,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Roger
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <
> > > > > > > > > > jay.kr...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > As mentioned in a previous email we are
> > working
> > > > on
> > > > > a
> > > > > > > > > > > > > > re-implementation
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > the producer. I would like to use this
> email
> > > > thread
> > > > > > to
> > > > > > > > > > discuss
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > details
> > > > > > > > > > > > > > > > > of the public API and the configuration. I
> > > would
> > > > > love
> > > > > > > for
> > > > > > > > > us
> > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > incredibly picky about this public api now
> so
> > > it
> > > > is
> > > > > > as
> > > > > > > > good
> > > > > > > > > > as
> > > > > > > > > > > > > > possible
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > we don't need to break it in the future.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The best way to get a feel for the API is
> > > > actually
> > > > > to
> > > > > > > > take
> > > > > > > > > a
> > > > > > > > > > > look
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > javadoc, my hope is to get the api docs
> good
> > > > enough
> > > > > > so
> > > > > > > > that
> > > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > > > > > self-explanatory:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Please take a look at this API and give me
> > any
> > > > > > thoughts
> > > > > > > > you
> > > > > > > > > > may
> > > > > > > > > > > > > have!
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > It may also be reasonable to take a look at
> > the
> > > > > > > configs:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The actual code is posted here:
> > > > > > > > > > > > > > > > >
> > > https://issues.apache.org/jira/browse/KAFKA-1227
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > A few questions or comments to kick things
> > off:
> > > > > > > > > > > > > > > > > 1. We need to make a decision on whether
> > > > > > serialization
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > user's
> > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > and value should be done by the user (with
> > our
> > > > api
> > > > > > just
> > > > > > > > > > taking
> > > > > > > > > > > > > > byte[])
> > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > if we should take an object and allow the
> > user
> > > to
> > > > > > > > > configure a
> > > > > > > > > > > > > > > Serializer
> > > > > > > > > > > > > > > > > class which we instantiate via reflection.
> We
> > > > take
> > > > > > the
> > > > > > > > > later
> > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > the current producer, and I have carried
> this
> > > > > through
> > > > > > > to
> > > > > > > > > this
> > > > > > > > > > > > > > > prototype.
> > > > > > > > > > > > > > > > > The tradeoff I see is this: taking byte[]
> is
> > > > > actually
> > > > > > > > > > simpler,
> > > > > > > > > > > > the
> > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > directly do whatever serialization they
> like.
> > > The
> > > > > > > > > > complication
> > > > > > > > > > > is
> > > > > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > partitioning. Currently partitioning is
> done
> > > by a
> > > > > > > similar
> > > > > > > > > > > plug-in
> > > > > > > > > > > > > api
> > > > > > > > > > > > > > > > > (Partitioner) which the user can implement
> > and
> > > > > > > configure
> > > > > > > > to
> > > > > > > > > > > > > override
> > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > partitions are assigned. If we take byte[]
> as
> > > > input
> > > > > > > then
> > > > > > > > we
> > > > > > > > > > > have
> > > > > > > > > > > > no
> > > > > > > > > > > > > > > > access
> > > > > > > > > > > > > > > > > to the original object and partitioning
> MUST
> > be
> > > > > done
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > > byte[].
> > > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > fine for hash partitioning. However for
> > various
> > > > > types
> > > > > > > of
> > > > > > > > > > > semantic
> > > > > > > > > > > > > > > > > partitioning (range partitioning, or
> > whatever)
> > > > you
> > > > > > > would
> > > > > > > > > want
> > > > > > > > > > > > > access
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > original object. In the current approach a
> > > > producer
> > > > > > who
> > > > > > > > > > wishes
> > > > > > > > > > > to
> > > > > > > > > > > > > > send
> > > > > > > > > > > > > > > > > byte[] they have serialized in their own
> code
> > > can
> > > > > > > > configure
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > BytesSerialization we supply which is just
> a
> > > "no
> > > > > op"
> > > > > > > > > > > > serialization.
> > > > > > > > > > > > > > > > > 2. We should obsess over naming and make
> sure
> > > > each
> > > > > of
> > > > > > > the
> > > > > > > > > > class
> > > > > > > > > > > > > names
> > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > good.
> > > > > > > > > > > > > > > > > 3. Jun has already pointed out that we need
> > to
> > > > > > include
> > > > > > > > the
> > > > > > > > > > > topic
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > partition in the response, which is
> > absolutely
> > > > > > right. I
> > > > > > > > > > haven't
> > > > > > > > > > > > > done
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > yet but that definitely needs to be there.
> > > > > > > > > > > > > > > > > 4. Currently RecordSend.await will throw an
> > > > > exception
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > > > request
> > > > > > > > > > > > > > > > > failed. The intention here is that
> > > > > > > > > > > producer.send(message).await()
> > > > > > > > > > > > > > > exactly
> > > > > > > > > > > > > > > > > simulates a synchronous call. Guozhang has
> > > noted
> > > > > that
> > > > > > > > this
> > > > > > > > > > is a
> > > > > > > > > > > > > > little
> > > > > > > > > > > > > > > > > annoying since the user must then catch
> > > > exceptions.
> > > > > > > > However
> > > > > > > > > > if
> > > > > > > > > > > we
> > > > > > > > > > > > > > > remove
> > > > > > > > > > > > > > > > > this then if the user doesn't check for
> > errors
> > > > they
> > > > > > > won't
> > > > > > > > > > know
> > > > > > > > > > > > one
> > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > > occurred, which I predict will be a common
> > > > mistake.
> > > > > > > > > > > > > > > > > 5. Perhaps there is more we could do to
> make
> > > the
> > > > > > async
> > > > > > > > > > > callbacks
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > future
> > > > > > > > > > > > > > > > > we give back intuitive and easy to program
> > > > against?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Some background info on implementation:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > At a high level the primary difference in
> > this
> > > > > > producer
> > > > > > > > is
> > > > > > > > > > that
> > > > > > > > > > > > it
> > > > > > > > > > > > > > > > removes
> > > > > > > > > > > > > > > > > the distinction between the "sync" and
> > "async"
> > > > > > > producer.
> > > > > > > > > > > > > Effectively
> > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > requests are sent asynchronously but always
> > > > return
> > > > > a
> > > > > > > > future
> > > > > > > > > > > > > response
> > > > > > > > > > > > > > > > object
> > > > > > > > > > > > > > > > > that gives the offset as well as any error
> > that
> > > > may
> > > > > > > have
> > > > > > > > > > > occurred
> > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > request is complete. The batching that is
> > done
> > > in
> > > > > the
> > > > > > > > async
> > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > today is done whenever possible now. This
> > means
> > > > > that
> > > > > > > the
> > > > > > > > > sync
> > > > > > > > > > > > > > producer,
> > > > > > > > > > > > > > > > > under load, can get performance as good as
> > the
> > > > > async
> > > > > > > > > producer
> > > > > > > > > > > > > > > > (preliminary
> > > > > > > > > > > > > > > > > results show the producer getting 1m
> > > > messages/sec).
> > > > > > > This
> > > > > > > > > > works
> > > > > > > > > > > > > > similar
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > group commit in databases but with respect
> to
> > > the
> > > > > > > actual
> > > > > > > > > > > network
> > > > > > > > > > > > > > > > > transmission--any messages that arrive
> while
> > a
> > > > send
> > > > > > is
> > > > > > > in
> > > > > > > > > > > > progress
> > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > batched together. It is also possible to
> > > > encourage
> > > > > > > > batching
> > > > > > > > > > > even
> > > > > > > > > > > > > > under
> > > > > > > > > > > > > > > > low
> > > > > > > > > > > > > > > > > load to save server resources by
> introducing
> > a
> > > > > delay
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > send
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > allow
> > > > > > > > > > > > > > > > > more messages to accumulate; this is done
> > using
> > > > the
> > > > > > > > > > > > linger.msconfig
> > > > > > > > > > > > > > > > (this
> > > > > > > > > > > > > > > > > is similar to Nagle's algorithm in TCP).
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This producer does all network
> communication
> > > > > > > > asynchronously
> > > > > > > > > > and
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > > parallel
> > > > > > > > > > > > > > > > > to all servers so the performance penalty
> for
> > > > > acks=-1
> > > > > > > and
> > > > > > > > > > > waiting
> > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > replication should be much reduced. I
> haven't
> > > > done
> > > > > > much
> > > > > > > > > > > > > benchmarking
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > this yet, though.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The high level design is described a little
> > > here,
> > > > > > > though
> > > > > > > > > this
> > > > > > > > > > > is
> > > > > > > > > > > > > now
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > little out of date:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > -Jay
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to