I thought a bit about it and I think the getCluster() thing was overly
simplistic because we try to only maintain metadata about the current set
of topics the producer cares about so the cluster might not have the
partitions for the topic the user cares about. I think actually what we
need is a new method on producer:
  List<PartitionInfo> partitionsFor(String...topics)
The intended usage of this method would be:
  int partition = myPartitionFunction(key, producer.partitionsFor(topic));
  producer.send(new ProducerRecord(topic, partition, key, value))
That is, the producer would re-fetch the current set of partitions every
time and the partitions would refresh at whatever schedule the producer
metadata refresh was set with.

So in the case of partition change producers would pick this up as their
natural metadata updates occurred.

This actually solves an important problem in the existing api, which is
that the send method will block on the first message send if we don't yet
have metadata for the destination topic. It blocks until metadata is
fetched. This is a little weird as this occurs even in non-blocking mode.
This allows an escape hatch. Someone who wants to avoid that small block on
the first send can initialize their producer and call
producer.partitionsFor(topics) to force metadata initialization.

-Jay



On Thu, Jan 30, 2014 at 4:34 PM, Jun Rao <jun...@gmail.com> wrote:

> With option 1A, if we increase # partitions on a topic, how will the
> producer find out newly created partitions? Do we expect the producer to
> periodically call getCluster()?
>
> As for ZK dependency, one of the goals of client rewrite is to reduce
> dependencies so that one can implement the client in languages other than
> java. ZK client is only available in a small number of languages.
>
> Thanks,
>
> Jun
>
>
> On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Clark and all,
> >
> > I thought a little bit about the serialization question. Here are the
> > options I see and the pros and cons I can think of. I'd love to hear
> > people's preferences if you have a strong one.
> >
> > One important consideration is that however the producer works will also
> > need to be how the new consumer works (which we hope to write next). That
> > is if you put objects in, you should get objects out. So we need to think
> > through both sides.
> >
> > Options:
> >
> > Option 0: What is in the existing scala code and the java code I
> > posted--Serializer and Partitioner plugin provided by the user via
> config.
> > Partitioner has a sane default, but Serializer needs to be specified in
> > config.
> >
> > Pros: How it works today in the scala code.
> > Cons: You have to adapt your serialization library of choice to our
> > interfaces. The reflective class loading means typo in the serializer
> name
> > give odd errors. Likewise there is little type safety--the ProducerRecord
> > takes Object and any type errors between the object provided and the
> > serializer give occurs at runtime.
> >
> > Option 1: No plugins
> >
> > This would mean byte[] key, byte[] value, and partitioning done by client
> > by passing in a partition *number* directly.
> >
> > The problem with this is that it is tricky to compute the partition
> > correctly and probably most people won't. We could add a getCluster()
> > method to return the Cluster instance you should use for partitioning.
> But
> > I suspect people would be lazy and not use that and instead hard-code
> > partitions which would break if partitions were added or they hard coded
> it
> > wrong. In my experience 3 partitioning strategies cover like 99% of cases
> > so not having a default implementation for this makes the common case
> > harder. Left to their own devices people will use bad hash functions and
> > get weird results.
> >
> > Option 1A: Alternatively we could partition by the key using the existing
> > default partitioning strategy which only uses the byte[] anyway but
> instead
> > of having a partitionKey we could have a numerical partition override and
> > add the getCluster() method to get the cluster metadata. That would make
> > custom partitioning possible but handle the common case simply.
> >
> > Option 2: Partitioner plugin remains, serializers go.
> >
> > The problem here is that the partitioner might lose access to the
> > deserialized key which would occasionally be useful for semantic
> > partitioning schemes. The Partitioner could deserialize the key but that
> > would be inefficient and weird.
> >
> > This problem could be fixed by having key and value be byte[] but
> retaining
> > partitionKey as an Object and passing it to the partitioner as is. Then
> if
> > you have a partitioner which requires the deserialized key you would need
> > to use this partition key. One weird side effect is that if you want to
> > have a custom partition key BUT want to partition by the bytes of that
> key
> > rather than the object value you must write a customer partitioner and
> > serialize it yourself.
> >
> > Of these I think I prefer 1A but could be convinced of 0 since that is
> how
> > it works now.
> >
> > Thoughts?
> >
> > -Jay
> >
> >
> > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com>
> wrote:
> >
> > > Jay - Thanks for the call for comments. Here's some initial input:
> > >
> > > - Make message serialization a client responsibility (making all
> messages
> > > byte[]). Reflection-based loading makes it harder to use generic codecs
> > > (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
> > programmatically.
> > > Non-default partitioning should require an explicit partition key.
> > >
> > > - I really like the fact that it will be native Java. Please consider
> > using
> > > native maven and not sbt, gradle, ivy, etc as they don't reliably play
> > nice
> > > in the maven ecosystem. A jar without a well-formed pom doesn't feel
> > like a
> > > real artifact. The pom's generated by sbt et al. are not well formed.
> > Using
> > > maven will make builds and IDE integration much smoother.
> > >
> > > - Look at Nick Telford's dropwizard-extras package in which he defines
> > some
> > > Jackson-compatible POJO's for loading configuration. Seems like your
> > client
> > > migration is similar. The config objects should have constructors or
> > > factories that accept Map<String, String> and Properties for ease of
> > > migration.
> > >
> > > - Would you consider using the org.apache.kafka package for the new API
> > > (quibble)
> > >
> > > - Why create your own futures rather than use
> > > java.util.concurrent.Future<Long> or similar? Standard futures will
> play
> > > nice with other reactive libs and things like J8's ComposableFuture.
> > >
> > > Thanks again,
> > > C
> > >
> > >
> > >
> > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
> > > >wrote:
> > >
> > > > A couple comments:
> > > >
> > > > 1) Why does the config use a broker list instead of discovering the
> > > brokers
> > > > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
> > > >
> > > > 2) It looks like broker connections are created on demand.  I'm
> > wondering
> > > > if sometimes you might want to flush out config or network
> connectivity
> > > > issues before pushing the first message through.
> > > >
> > > > Should there also be a KafkaProducer.connect() or .open() method or
> > > > connectAll()?  I guess it would try to connect to all brokers in the
> > > > BROKER_LIST_CONFIG
> > > >
> > > > HTH,
> > > >
> > > > Roger
> > > >
> > > >
> > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > As mentioned in a previous email we are working on a
> > re-implementation
> > > of
> > > > > the producer. I would like to use this email thread to discuss the
> > > > details
> > > > > of the public API and the configuration. I would love for us to be
> > > > > incredibly picky about this public api now so it is as good as
> > possible
> > > > and
> > > > > we don't need to break it in the future.
> > > > >
> > > > > The best way to get a feel for the API is actually to take a look
> at
> > > the
> > > > > javadoc, my hope is to get the api docs good enough so that it is
> > > > > self-explanatory:
> > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > > >
> > > > > Please take a look at this API and give me any thoughts you may
> have!
> > > > >
> > > > > It may also be reasonable to take a look at the configs:
> > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > > > >
> > > > > The actual code is posted here:
> > > > > https://issues.apache.org/jira/browse/KAFKA-1227
> > > > >
> > > > > A few questions or comments to kick things off:
> > > > > 1. We need to make a decision on whether serialization of the
> user's
> > > key
> > > > > and value should be done by the user (with our api just taking
> > byte[])
> > > or
> > > > > if we should take an object and allow the user to configure a
> > > Serializer
> > > > > class which we instantiate via reflection. We take the later
> approach
> > > in
> > > > > the current producer, and I have carried this through to this
> > > prototype.
> > > > > The tradeoff I see is this: taking byte[] is actually simpler, the
> > user
> > > > can
> > > > > directly do whatever serialization they like. The complication is
> > > > actually
> > > > > partitioning. Currently partitioning is done by a similar plug-in
> api
> > > > > (Partitioner) which the user can implement and configure to
> override
> > > how
> > > > > partitions are assigned. If we take byte[] as input then we have no
> > > > access
> > > > > to the original object and partitioning MUST be done on the byte[].
> > > This
> > > > is
> > > > > fine for hash partitioning. However for various types of semantic
> > > > > partitioning (range partitioning, or whatever) you would want
> access
> > to
> > > > the
> > > > > original object. In the current approach a producer who wishes to
> > send
> > > > > byte[] they have serialized in their own code can configure the
> > > > > BytesSerialization we supply which is just a "no op" serialization.
> > > > > 2. We should obsess over naming and make sure each of the class
> names
> > > are
> > > > > good.
> > > > > 3. Jun has already pointed out that we need to include the topic
> and
> > > > > partition in the response, which is absolutely right. I haven't
> done
> > > that
> > > > > yet but that definitely needs to be there.
> > > > > 4. Currently RecordSend.await will throw an exception if the
> request
> > > > > failed. The intention here is that producer.send(message).await()
> > > exactly
> > > > > simulates a synchronous call. Guozhang has noted that this is a
> > little
> > > > > annoying since the user must then catch exceptions. However if we
> > > remove
> > > > > this then if the user doesn't check for errors they won't know one
> > has
> > > > > occurred, which I predict will be a common mistake.
> > > > > 5. Perhaps there is more we could do to make the async callbacks
> and
> > > > future
> > > > > we give back intuitive and easy to program against?
> > > > >
> > > > > Some background info on implementation:
> > > > >
> > > > > At a high level the primary difference in this producer is that it
> > > > removes
> > > > > the distinction between the "sync" and "async" producer.
> Effectively
> > > all
> > > > > requests are sent asynchronously but always return a future
> response
> > > > object
> > > > > that gives the offset as well as any error that may have occurred
> > when
> > > > the
> > > > > request is complete. The batching that is done in the async
> producer
> > > only
> > > > > today is done whenever possible now. This means that the sync
> > producer,
> > > > > under load, can get performance as good as the async producer
> > > > (preliminary
> > > > > results show the producer getting 1m messages/sec). This works
> > similar
> > > to
> > > > > group commit in databases but with respect to the actual network
> > > > > transmission--any messages that arrive while a send is in progress
> > are
> > > > > batched together. It is also possible to encourage batching even
> > under
> > > > low
> > > > > load to save server resources by introducing a delay on the send to
> > > allow
> > > > > more messages to accumulate; this is done using the linger.msconfig
> > > > (this
> > > > > is similar to Nagle's algorithm in TCP).
> > > > >
> > > > > This producer does all network communication asynchronously and in
> > > > parallel
> > > > > to all servers so the performance penalty for acks=-1 and waiting
> on
> > > > > replication should be much reduced. I haven't done much
> benchmarking
> > on
> > > > > this yet, though.
> > > > >
> > > > > The high level design is described a little here, though this is
> now
> > a
> > > > > little out of date:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
>

Reply via email to