AutoCloseable would be nice for us as most of our code is using Java 7 at
this point.

I like Dropwizard's configuration mapping to POJOs via Jackson, but if you
wanted to stick with property maps I don't care enough to object.

If the producer only dealt with bytes, is there a way we could still due
partition plugins without specifying the number explicitly? I would prefer
to be able to pass in field(s) that would be used by the partitioner.
Obviously if this wasn't possible you could always deserialize the object
in the partitioner and grab the fields you want, but that seems really
expensive to do on every message.

It would also be nice to have a Java API Encoder constructor taking in
VerifiableProperties. Scala understands how to handle "props:
VerifiableProperties = null", but Java doesn't. So you don't run into this
problem until runtime.


-Xavier


On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman <cl...@breyman.com> wrote:

> Jay -
>
> Config - your explanation makes sense. I'm just so accustomed to having
> Jackson automatically map my configuration objects to POJOs that I've
> stopped using property files. They are lingua franca. The only thought
> might be to separate the config interface from the implementation to allow
> for alternatives, but that might undermine your point of "do it this way so
> that everyone can find it where they expect it".
>
> Serialization: Of the options, I like 1A the best, though possibly with
> either an option to specify a partition key rather than ID or a helper to
> translate an arbitrary byte[] or long into a partition number.
>
> Thanks
> Clark
>
>
> On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Thanks for the detailed thoughts. Let me elaborate on the config thing.
> >
> > I agree that at first glance key-value strings don't seem like a very
> good
> > configuration api for a client. Surely a well-typed config class would be
> > better! I actually disagree and let me see if I can convince you.
> >
> > My reasoning has nothing to do with the api and everything to do with
> > operations.
> >
> > Clients are embedded in applications which are themselves configured. In
> > any place that takes operations seriously the configuration for these
> > applications will be version controlled and maintained through some kind
> of
> > config management system. If we give a config class with getters and
> > setters the application has to expose those properties to its
> > configuration. What invariably happens is that the application exposes
> only
> > a choice few properties that they thought they would change. Furthermore
> > the application will make up a name for these configs that seems
> intuitive
> > at the time in the 2 seconds the engineer spends thinking about it.
> >
> > Now consider the result of this in the large. You end up with dozens or
> > hundreds of applications that have the client embedded. Each exposes a
> > different, inadequate subset of the possible configs, each with different
> > names. It is a nightmare.
> >
> > If you use a string-string map the config system can directly get a
> bundle
> > of config key-value pairs and put them into the client. This means that
> all
> > configuration is automatically available with the name documented on the
> > website in every application that does this. If you upgrade to a new
> kafka
> > version with more configs those will be exposed too. If you realize that
> > you need to change a default you can just go through your configs and
> > change it everywhere as it will have the same name everywhere.
> >
> > -Jay
> >
> >
> >
> >
> > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman <cl...@breyman.com>
> wrote:
> >
> > > Thanks Jay. I'll see if I can put together a more complete response,
> > > perhaps as separate threads so that topics don't get entangled. In the
> > mean
> > > time, here's a couple responses:
> > >
> > > Serialization: you've broken out a sub-thread so i'll reply there. My
> > bias
> > > is that I like generics (except for type-erasure) and in particular
> they
> > > make it easy to compose serializers for compound payloads (e.g. when a
> > > common header wraps a payload of parameterized type). I'll respond to
> > your
> > > 4-options message with an example.
> > >
> > > Build: I've seen a lot of "maven-compatible" build systems produce
> > > "artifacts" that aren't really artifacts - no embedded POM or, worst,
> > > malformed POM. I know the sbt-generated artifacts were this way - onus
> is
> > > on me to see what gradle is spitting out and what a maven build might
> > look
> > > like. Maven may be old and boring, but it gets out of the way and
> > > integrates really seamlessly with a lot of IDEs. When some scala
> > projects I
> > > was working on in the fall of 2011 switched from sbt to maven, build
> > became
> > > a non-issue.
> > >
> > > Config: Not a big deal  and no, I don't think a dropwizard dependency
> is
> > > appropriate. I do like using simple entity beans (POJO's not j2EE) for
> > > configuration, especially if they can be marshalled without annotation
> by
> > > Jackson. I only mentioned the dropwizard-extras  because it has some
> > entity
> > > bean versions of the ZK and Kafka configs.
> > >
> > > Domain-packaging: Also not a big deal - it's what's expected and it's
> > > pretty free in most IDE's. The advantages I see is that it is clear
> > whether
> > > something is from the Apache Kafka project and whether something is
> from
> > > another org and related to Kafka. That said, nothing really enforces
> it.
> > >
> > > Futures: I'll see if I can create some examples to demonstrate Future
> > > making interop easier.
> > >
> > > Regards,
> > > C
> > >
> > >
> > >
> > >
> > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > Hey Clark,
> > > >
> > > > - Serialization: Yes I agree with these though I don't consider the
> > loss
> > > of
> > > > generics a big issue. I'll try to summarize what I would consider the
> > > best
> > > > alternative api with raw byte[].
> > > >
> > > > - Maven: We had this debate a few months back and the consensus was
> > > gradle.
> > > > Is there a specific issue with the poms gradle makes? I am extremely
> > > loath
> > > > to revisit the issue as build issues are a recurring thing and no one
> > > ever
> > > > agrees and ultimately our build needs are very simple.
> > > >
> > > > - Config: I'm not sure if I follow the point. Are you saying we
> should
> > > use
> > > > something in dropwizard for config? One principle here is to try to
> > > remove
> > > > as many client dependencies as possible as we inevitably run into
> > > terrible
> > > > compatibility issues with users who use the same library or its
> > > > dependencies at different versions. Or are you talking about
> > maintaining
> > > > compatibility with existing config parameters? I think as much as a
> > > config
> > > > in the existing client makes sense it should have the same name (I
> was
> > a
> > > > bit sloppy about that so I'll fix any errors there). There are a few
> > new
> > > > things and we should give those reasonable defaults. I think config
> is
> > > > important so I'll start a thread on the config package in there.
> > > >
> > > > - org.apache.kafka: We could do this. I always considered it kind of
> an
> > > odd
> > > > thing Java programmers do that has no real motivation (but I could be
> > > > re-educated!). I don't think it ends up reducing naming conflicts in
> > > > practice and it adds a lot of noise and nested directories. Is there
> a
> > > > reason you prefer this or just to be more standard?
> > > >
> > > > - Future: Basically I didn't see any particular advantage. The
> cancel()
> > > > method doesn't really make sense so probably wouldn't work. Likewise
> I
> > > > dislike the checked exceptions it requires. Basically I just wrote
> out
> > > some
> > > > code examples and it seemed cleaner with a special purpose object. I
> > > wasn't
> > > > actually aware of plans for improved futures in java 8 or the other
> > > > integrations. Maybe you could elaborate on this a bit and show how it
> > > would
> > > > be used? Sounds promising, I just don't know a lot about it.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com>
> > > wrote:
> > > >
> > > > > Jay - Thanks for the call for comments. Here's some initial input:
> > > > >
> > > > > - Make message serialization a client responsibility (making all
> > > messages
> > > > > byte[]). Reflection-based loading makes it harder to use generic
> > codecs
> > > > > (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
> > > > programmatically.
> > > > > Non-default partitioning should require an explicit partition key.
> > > > >
> > > > > - I really like the fact that it will be native Java. Please
> consider
> > > > using
> > > > > native maven and not sbt, gradle, ivy, etc as they don't reliably
> > play
> > > > nice
> > > > > in the maven ecosystem. A jar without a well-formed pom doesn't
> feel
> > > > like a
> > > > > real artifact. The pom's generated by sbt et al. are not well
> formed.
> > > > Using
> > > > > maven will make builds and IDE integration much smoother.
> > > > >
> > > > > - Look at Nick Telford's dropwizard-extras package in which he
> > defines
> > > > some
> > > > > Jackson-compatible POJO's for loading configuration. Seems like
> your
> > > > client
> > > > > migration is similar. The config objects should have constructors
> or
> > > > > factories that accept Map<String, String> and Properties for ease
> of
> > > > > migration.
> > > > >
> > > > > - Would you consider using the org.apache.kafka package for the new
> > API
> > > > > (quibble)
> > > > >
> > > > > - Why create your own futures rather than use
> > > > > java.util.concurrent.Future<Long> or similar? Standard futures will
> > > play
> > > > > nice with other reactive libs and things like J8's
> ComposableFuture.
> > > > >
> > > > > Thanks again,
> > > > > C
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <
> > roger.hoo...@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > A couple comments:
> > > > > >
> > > > > > 1) Why does the config use a broker list instead of discovering
> the
> > > > > brokers
> > > > > > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
> > > > > >
> > > > > > 2) It looks like broker connections are created on demand.  I'm
> > > > wondering
> > > > > > if sometimes you might want to flush out config or network
> > > connectivity
> > > > > > issues before pushing the first message through.
> > > > > >
> > > > > > Should there also be a KafkaProducer.connect() or .open() method
> or
> > > > > > connectAll()?  I guess it would try to connect to all brokers in
> > the
> > > > > > BROKER_LIST_CONFIG
> > > > > >
> > > > > > HTH,
> > > > > >
> > > > > > Roger
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > As mentioned in a previous email we are working on a
> > > > re-implementation
> > > > > of
> > > > > > > the producer. I would like to use this email thread to discuss
> > the
> > > > > > details
> > > > > > > of the public API and the configuration. I would love for us to
> > be
> > > > > > > incredibly picky about this public api now so it is as good as
> > > > possible
> > > > > > and
> > > > > > > we don't need to break it in the future.
> > > > > > >
> > > > > > > The best way to get a feel for the API is actually to take a
> look
> > > at
> > > > > the
> > > > > > > javadoc, my hope is to get the api docs good enough so that it
> is
> > > > > > > self-explanatory:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > > > > >
> > > > > > > Please take a look at this API and give me any thoughts you may
> > > have!
> > > > > > >
> > > > > > > It may also be reasonable to take a look at the configs:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > > > > > >
> > > > > > > The actual code is posted here:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-1227
> > > > > > >
> > > > > > > A few questions or comments to kick things off:
> > > > > > > 1. We need to make a decision on whether serialization of the
> > > user's
> > > > > key
> > > > > > > and value should be done by the user (with our api just taking
> > > > byte[])
> > > > > or
> > > > > > > if we should take an object and allow the user to configure a
> > > > > Serializer
> > > > > > > class which we instantiate via reflection. We take the later
> > > approach
> > > > > in
> > > > > > > the current producer, and I have carried this through to this
> > > > > prototype.
> > > > > > > The tradeoff I see is this: taking byte[] is actually simpler,
> > the
> > > > user
> > > > > > can
> > > > > > > directly do whatever serialization they like. The complication
> is
> > > > > > actually
> > > > > > > partitioning. Currently partitioning is done by a similar
> plug-in
> > > api
> > > > > > > (Partitioner) which the user can implement and configure to
> > > override
> > > > > how
> > > > > > > partitions are assigned. If we take byte[] as input then we
> have
> > no
> > > > > > access
> > > > > > > to the original object and partitioning MUST be done on the
> > byte[].
> > > > > This
> > > > > > is
> > > > > > > fine for hash partitioning. However for various types of
> semantic
> > > > > > > partitioning (range partitioning, or whatever) you would want
> > > access
> > > > to
> > > > > > the
> > > > > > > original object. In the current approach a producer who wishes
> to
> > > > send
> > > > > > > byte[] they have serialized in their own code can configure the
> > > > > > > BytesSerialization we supply which is just a "no op"
> > serialization.
> > > > > > > 2. We should obsess over naming and make sure each of the class
> > > names
> > > > > are
> > > > > > > good.
> > > > > > > 3. Jun has already pointed out that we need to include the
> topic
> > > and
> > > > > > > partition in the response, which is absolutely right. I haven't
> > > done
> > > > > that
> > > > > > > yet but that definitely needs to be there.
> > > > > > > 4. Currently RecordSend.await will throw an exception if the
> > > request
> > > > > > > failed. The intention here is that
> producer.send(message).await()
> > > > > exactly
> > > > > > > simulates a synchronous call. Guozhang has noted that this is a
> > > > little
> > > > > > > annoying since the user must then catch exceptions. However if
> we
> > > > > remove
> > > > > > > this then if the user doesn't check for errors they won't know
> > one
> > > > has
> > > > > > > occurred, which I predict will be a common mistake.
> > > > > > > 5. Perhaps there is more we could do to make the async
> callbacks
> > > and
> > > > > > future
> > > > > > > we give back intuitive and easy to program against?
> > > > > > >
> > > > > > > Some background info on implementation:
> > > > > > >
> > > > > > > At a high level the primary difference in this producer is that
> > it
> > > > > > removes
> > > > > > > the distinction between the "sync" and "async" producer.
> > > Effectively
> > > > > all
> > > > > > > requests are sent asynchronously but always return a future
> > > response
> > > > > > object
> > > > > > > that gives the offset as well as any error that may have
> occurred
> > > > when
> > > > > > the
> > > > > > > request is complete. The batching that is done in the async
> > > producer
> > > > > only
> > > > > > > today is done whenever possible now. This means that the sync
> > > > producer,
> > > > > > > under load, can get performance as good as the async producer
> > > > > > (preliminary
> > > > > > > results show the producer getting 1m messages/sec). This works
> > > > similar
> > > > > to
> > > > > > > group commit in databases but with respect to the actual
> network
> > > > > > > transmission--any messages that arrive while a send is in
> > progress
> > > > are
> > > > > > > batched together. It is also possible to encourage batching
> even
> > > > under
> > > > > > low
> > > > > > > load to save server resources by introducing a delay on the
> send
> > to
> > > > > allow
> > > > > > > more messages to accumulate; this is done using the
> > linger.msconfig
> > > > > > (this
> > > > > > > is similar to Nagle's algorithm in TCP).
> > > > > > >
> > > > > > > This producer does all network communication asynchronously and
> > in
> > > > > > parallel
> > > > > > > to all servers so the performance penalty for acks=-1 and
> waiting
> > > on
> > > > > > > replication should be much reduced. I haven't done much
> > > benchmarking
> > > > on
> > > > > > > this yet, though.
> > > > > > >
> > > > > > > The high level design is described a little here, though this
> is
> > > now
> > > > a
> > > > > > > little out of date:
> > > > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to