> 1. Change send to use java.util.concurrent.Future in send():
>   Future<RecordPosition> send(ProducerRecord record, Callback callback)

+0 on Future vs the custom future: I personally don't find too much of
a difference between a custom future and the standard future; (even if
it means having to put up with the checked exceptions that future
brings).

> The cancel method will always return false and not do anything. Callback
> will then be changed to
>   interface Callback {
>     public void onCompletion(RecordPosition position, Exception exception);
>   }
> so that the exception is available there too and will be null if no error
> occurred. I'm not planning on changing the name callback because I haven't
> thought of a better one.

Can you describe RecordPosition? I'm assuming it is a rename of
RecordSend, but can you make it concrete? I don't recall seeing it in
the past threads, but I think it is a bit of an unintuitive name. I
think it conveys offset of the successfully produced record, but not
sure what it would mean if the produce failed.

Can you elaborate why you think we cannot implement a meaningful
cancel (even though I'm not sure there is a proper use case for it)?
As long as the record has not been sent out, the RecordSend could be
flagged as canceled (by the client) and the sender can just deallocate
it without sending.

>
> 2. We will change the way serialization works to proposal 1A in the
> previous discussion. That is the Partitioner and Serializer interfaces will
> disappear. ProducerRecord will change to:
>   class ProducerRecord {
>     public byte[] key() {...}
>     public byte[] value() {...}
>     public Integer partition() {...} // can be null
>   }

+1

> So key and value are now byte[]; partitionKey will be replaced by an
> optional partition. The behavior will be the following:
> 1. If partition is present it will be used.
> 2. If no partition is present but a key is present, a partition will be
> chosen by a hash of the key
> 3. If no key is present AND no partition id is present, partitions will be
> chosen in a round robin fashion
> In other words what is currently the DefaultPartitioner will now be hard
> coded as the behavior whenever no partition is provided.
>
> In order to allow correctly choosing a partition the Producer interface
> will include a new method:
>   List<PartitionInfo> partitionsForTopic(String topic);
> PartitionInfo will be changed to include the actual Node objects not just
> the Node ids.

Why are the node id's alone insufficient? You can still do
round-robin/connection-limiting, etc. with just the node id right?

Also, will partitionsForTopic block if it is a new topic? Or will it
just return empty (until the topic is created) and have the client
retry?

>
> 1. I don't plan to change the build system. The SBT=>gradle change is
> basically orthoganol and we should debate it in the context of its ticket.

Makes sense.

> 2. I'm going to stick with my oddball kafka.* rather than
> org.apache.kafka.* package name and non-getter methods unless everyone
> complains.

+0 on this - I don't know if there are any ramifications of going
without a standard org packaging hierarchy but if not, then +1

> 3. I'm not going to introduce a zookeeper dependency in the client as I
> don't see any advantage.

+1

> 4. There were a number of reasonable suggestions on the Callback execution
> model. I'm going to leave it as is, though. Because we are moving to use
> Java Future we can't include functionality like ListenableFuture. I think
> the simplestic callback model where callbacks are executed in the i/o
> thread should be good enough for most cases and other cases can use their
> own threading.

+1

Thanks,

Joel

Reply via email to