+1 all of Clark's points above.

On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote:

> Jay - Thanks for the call for comments. Here's some initial input:
>
> - Make message serialization a client responsibility (making all messages
> byte[]). Reflection-based loading makes it harder to use generic codecs
> (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec programmatically.
> Non-default partitioning should require an explicit partition key.
>
> - I really like the fact that it will be native Java. Please consider using
> native maven and not sbt, gradle, ivy, etc as they don't reliably play nice
> in the maven ecosystem. A jar without a well-formed pom doesn't feel like a
> real artifact. The pom's generated by sbt et al. are not well formed. Using
> maven will make builds and IDE integration much smoother.
>
> - Look at Nick Telford's dropwizard-extras package in which he defines some
> Jackson-compatible POJO's for loading configuration. Seems like your client
> migration is similar. The config objects should have constructors or
> factories that accept Map<String, String> and Properties for ease of
> migration.
>
> - Would you consider using the org.apache.kafka package for the new API
> (quibble)
>
> - Why create your own futures rather than use
> java.util.concurrent.Future<Long> or similar? Standard futures will play
> nice with other reactive libs and things like J8's ComposableFuture.
>
> Thanks again,
> C
>
>
>
> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
> >wrote:
>
> > A couple comments:
> >
> > 1) Why does the config use a broker list instead of discovering the
> brokers
> > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
> >
> > 2) It looks like broker connections are created on demand.  I'm wondering
> > if sometimes you might want to flush out config or network connectivity
> > issues before pushing the first message through.
> >
> > Should there also be a KafkaProducer.connect() or .open() method or
> > connectAll()?  I guess it would try to connect to all brokers in the
> > BROKER_LIST_CONFIG
> >
> > HTH,
> >
> > Roger
> >
> >
> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > As mentioned in a previous email we are working on a re-implementation
> of
> > > the producer. I would like to use this email thread to discuss the
> > details
> > > of the public API and the configuration. I would love for us to be
> > > incredibly picky about this public api now so it is as good as possible
> > and
> > > we don't need to break it in the future.
> > >
> > > The best way to get a feel for the API is actually to take a look at
> the
> > > javadoc, my hope is to get the api docs good enough so that it is
> > > self-explanatory:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > >
> > > Please take a look at this API and give me any thoughts you may have!
> > >
> > > It may also be reasonable to take a look at the configs:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > >
> > > The actual code is posted here:
> > > https://issues.apache.org/jira/browse/KAFKA-1227
> > >
> > > A few questions or comments to kick things off:
> > > 1. We need to make a decision on whether serialization of the user's
> key
> > > and value should be done by the user (with our api just taking byte[])
> or
> > > if we should take an object and allow the user to configure a
> Serializer
> > > class which we instantiate via reflection. We take the later approach
> in
> > > the current producer, and I have carried this through to this
> prototype.
> > > The tradeoff I see is this: taking byte[] is actually simpler, the user
> > can
> > > directly do whatever serialization they like. The complication is
> > actually
> > > partitioning. Currently partitioning is done by a similar plug-in api
> > > (Partitioner) which the user can implement and configure to override
> how
> > > partitions are assigned. If we take byte[] as input then we have no
> > access
> > > to the original object and partitioning MUST be done on the byte[].
> This
> > is
> > > fine for hash partitioning. However for various types of semantic
> > > partitioning (range partitioning, or whatever) you would want access to
> > the
> > > original object. In the current approach a producer who wishes to send
> > > byte[] they have serialized in their own code can configure the
> > > BytesSerialization we supply which is just a "no op" serialization.
> > > 2. We should obsess over naming and make sure each of the class names
> are
> > > good.
> > > 3. Jun has already pointed out that we need to include the topic and
> > > partition in the response, which is absolutely right. I haven't done
> that
> > > yet but that definitely needs to be there.
> > > 4. Currently RecordSend.await will throw an exception if the request
> > > failed. The intention here is that producer.send(message).await()
> exactly
> > > simulates a synchronous call. Guozhang has noted that this is a little
> > > annoying since the user must then catch exceptions. However if we
> remove
> > > this then if the user doesn't check for errors they won't know one has
> > > occurred, which I predict will be a common mistake.
> > > 5. Perhaps there is more we could do to make the async callbacks and
> > future
> > > we give back intuitive and easy to program against?
> > >
> > > Some background info on implementation:
> > >
> > > At a high level the primary difference in this producer is that it
> > removes
> > > the distinction between the "sync" and "async" producer. Effectively
> all
> > > requests are sent asynchronously but always return a future response
> > object
> > > that gives the offset as well as any error that may have occurred when
> > the
> > > request is complete. The batching that is done in the async producer
> only
> > > today is done whenever possible now. This means that the sync producer,
> > > under load, can get performance as good as the async producer
> > (preliminary
> > > results show the producer getting 1m messages/sec). This works similar
> to
> > > group commit in databases but with respect to the actual network
> > > transmission--any messages that arrive while a send is in progress are
> > > batched together. It is also possible to encourage batching even under
> > low
> > > load to save server resources by introducing a delay on the send to
> allow
> > > more messages to accumulate; this is done using the linger.ms config
> > (this
> > > is similar to Nagle's algorithm in TCP).
> > >
> > > This producer does all network communication asynchronously and in
> > parallel
> > > to all servers so the performance penalty for acks=-1 and waiting on
> > > replication should be much reduced. I haven't done much benchmarking on
> > > this yet, though.
> > >
> > > The high level design is described a little here, though this is now a
> > > little out of date:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >
> > > -Jay
> > >
> >
>

Reply via email to