Okay I posted a patch against trunk that carries out the refactoring
described above:
https://issues.apache.org/jira/browse/KAFKA-1227

Updated javadoc is here:
http://empathybox.com/kafka-javadoc

This touches a fair number of files as I also improved documentation and
standardized terminology in a few places. Review appreciated!

-Jay


On Fri, Jan 31, 2014 at 3:04 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey folks,
>
> Thanks for all the excellent suggestions on the producer API, I think this
> really made things better. We'll do a similar thing for the consumer as we
> get a proposal together. I wanted to summarize everything I heard and the
> proposed changes I plan to do versus ignore. I'd like to get feedback from
> the committers or anyone else interest on this as a proposal (+1/0/-1)
> before I go make the changes just to avoid churn at code review time.
>
> 1. Change send to use java.util.concurrent.Future in send():
>   Future<RecordPosition> send(ProducerRecord record, Callback callback)
> The cancel method will always return false and not do anything. Callback
> will then be changed to
>   interface Callback {
>     public void onCompletion(RecordPosition position, Exception exception);
>   }
> so that the exception is available there too and will be null if no error
> occurred. I'm not planning on changing the name callback because I haven't
> thought of a better one.
>
> 2. We will change the way serialization works to proposal 1A in the
> previous discussion. That is the Partitioner and Serializer interfaces will
> disappear. ProducerRecord will change to:
>   class ProducerRecord {
>     public byte[] key() {...}
>     public byte[] value() {...}
>     public Integer partition() {...} // can be null
>   }
> So key and value are now byte[]; partitionKey will be replaced by an
> optional partition. The behavior will be the following:
> 1. If partition is present it will be used.
> 2. If no partition is present but a key is present, a partition will be
> chosen by a hash of the key
> 3. If no key is present AND no partition id is present, partitions will be
> chosen in a round robin fashion
> In other words what is currently the DefaultPartitioner will now be hard
> coded as the behavior whenever no partition is provided.
>
> In order to allow correctly choosing a partition the Producer interface
> will include a new method:
>   List<PartitionInfo> partitionsForTopic(String topic);
> PartitionInfo will be changed to include the actual Node objects not just
> the Node ids.
>
> I think this will still make it possible to implement any partitioning
> strategy you would want. The "challenge case" I considered was the
> partitioner that minimizes the number of TCP connections. This partitioner
> would chose the partition hosted on the node it had most recently chosen in
> hopes that it would still have a connection to that node.
>
> It will be slightly more awkward to package partitioners in this model but
> the simple cases are simpler so hopefully its worth it.
>
> 3. I will make the producer implement java.io.Closable but not throw any
> exceptions as there doesn't really seem to be any disadvantage to this and
> the interface may remind people to call close.
>
> 4. I am going to break the config stuff into a separate discussion as I
> don't think I have done enough to document and name the configs well and I
> need to do a pass on that first.
>
> 5. There were a number of comments about internals, mostly right on, which
> I am going to handle separately.
>
> Non-changes
>
> 1. I don't plan to change the build system. The SBT=>gradle change is
> basically orthoganol and we should debate it in the context of its ticket.
> 2. I'm going to stick with my oddball kafka.* rather than
> org.apache.kafka.* package name and non-getter methods unless everyone
> complains.
> 3. I'm not going to introduce a zookeeper dependency in the client as I
> don't see any advantage.
> 4. There were a number of reasonable suggestions on the Callback execution
> model. I'm going to leave it as is, though. Because we are moving to use
> Java Future we can't include functionality like ListenableFuture. I think
> the simplestic callback model where callbacks are executed in the i/o
> thread should be good enough for most cases and other cases can use their
> own threading.
>

Reply via email to