Hey folks,

Thanks for all the excellent suggestions on the producer API, I think this
really made things better. We'll do a similar thing for the consumer as we
get a proposal together. I wanted to summarize everything I heard and the
proposed changes I plan to do versus ignore. I'd like to get feedback from
the committers or anyone else interest on this as a proposal (+1/0/-1)
before I go make the changes just to avoid churn at code review time.

1. Change send to use java.util.concurrent.Future in send():
  Future<RecordPosition> send(ProducerRecord record, Callback callback)
The cancel method will always return false and not do anything. Callback
will then be changed to
  interface Callback {
    public void onCompletion(RecordPosition position, Exception exception);
  }
so that the exception is available there too and will be null if no error
occurred. I'm not planning on changing the name callback because I haven't
thought of a better one.

2. We will change the way serialization works to proposal 1A in the
previous discussion. That is the Partitioner and Serializer interfaces will
disappear. ProducerRecord will change to:
  class ProducerRecord {
    public byte[] key() {...}
    public byte[] value() {...}
    public Integer partition() {...} // can be null
  }
So key and value are now byte[]; partitionKey will be replaced by an
optional partition. The behavior will be the following:
1. If partition is present it will be used.
2. If no partition is present but a key is present, a partition will be
chosen by a hash of the key
3. If no key is present AND no partition id is present, partitions will be
chosen in a round robin fashion
In other words what is currently the DefaultPartitioner will now be hard
coded as the behavior whenever no partition is provided.

In order to allow correctly choosing a partition the Producer interface
will include a new method:
  List<PartitionInfo> partitionsForTopic(String topic);
PartitionInfo will be changed to include the actual Node objects not just
the Node ids.

I think this will still make it possible to implement any partitioning
strategy you would want. The "challenge case" I considered was the
partitioner that minimizes the number of TCP connections. This partitioner
would chose the partition hosted on the node it had most recently chosen in
hopes that it would still have a connection to that node.

It will be slightly more awkward to package partitioners in this model but
the simple cases are simpler so hopefully its worth it.

3. I will make the producer implement java.io.Closable but not throw any
exceptions as there doesn't really seem to be any disadvantage to this and
the interface may remind people to call close.

4. I am going to break the config stuff into a separate discussion as I
don't think I have done enough to document and name the configs well and I
need to do a pass on that first.

5. There were a number of comments about internals, mostly right on, which
I am going to handle separately.

Non-changes

1. I don't plan to change the build system. The SBT=>gradle change is
basically orthoganol and we should debate it in the context of its ticket.
2. I'm going to stick with my oddball kafka.* rather than
org.apache.kafka.* package name and non-getter methods unless everyone
complains.
3. I'm not going to introduce a zookeeper dependency in the client as I
don't see any advantage.
4. There were a number of reasonable suggestions on the Callback execution
model. I'm going to leave it as is, though. Because we are moving to use
Java Future we can't include functionality like ListenableFuture. I think
the simplestic callback model where callbacks are executed in the i/o
thread should be good enough for most cases and other cases can use their
own threading.

Reply via email to