For (3) we could also do the following: - On any retryable producer error, force a metadata refresh (in handleProducerResponse). - In handleMetadataResponse, the producer can (internally) close out connections that are no longer valid. (i.e., connections to {old set of leader brokers} - {new set of leader brokers})
A potential drawback of using a TTL on unused connections is that during events such as a broker bounce several producers across the organization could maintain such connections to the bounced broker. So there would be a transient period of higher than necessary connections. On Mon, Feb 03, 2014 at 11:45:41AM -0800, Jay Kreps wrote: > 1. Yes, I think the name could be improved. However that name doesn't > really express what it does. What about RecordSentCallback? > 2. As Neha said the nodes are initialized to a random position. Round robin > is preferable to random (lower variance, cheaper to compute, etc). Your > point about skipping non-available nodes is a good one, I will add that. > 3. I think just implementing the connection timeout we previously described > (where after some period of disuse we close a connection) is good enough. > This would mean that connections don't last longer than that timeout. > > -Jay > > On Mon, Feb 3, 2014 at 10:45 AM, Jun Rao <jun...@gmail.com> wrote: > > > Fine with most of the changes. Just a few questions/comments. > > > > 1. Would it be better to change Callback to ProducerCallback to distinguish > > it from controller callback and potential future other types of callbacks > > (e.g. consumer)? > > > > 2. If no key is present AND no partition id is present, partitions > > will be chosen > > in a round robin fashion. > > > > Currently, our default event handler picks a random and available > > partition. This is probably better than round robin because (1) if the > > replication factor is 1 and there is a broker failure, we can still route > > the message, (2) if a bunch of producers are started at the same time, this > > prevents them from picking up the same partition in a synchronized way. > > > > 3. I think this will still make it possible to implement any > > partitioning strategy > > you would want. The "challenge case" I considered was the partitioner that > > minimizes the number of TCP connections. This artitioner > > would chose the partition hosted on the node it had most recently > > chosen in hopes > > that it would still have a connection to that node. > > > > To support this mode, we probably need a method in the producer to close > > all existing sockets. > > > > Thanks, > > > > Jun > > > > > > > > On Fri, Jan 31, 2014 at 3:04 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Hey folks, > > > > > > Thanks for all the excellent suggestions on the producer API, I think > > this > > > really made things better. We'll do a similar thing for the consumer as > > we > > > get a proposal together. I wanted to summarize everything I heard and the > > > proposed changes I plan to do versus ignore. I'd like to get feedback > > from > > > the committers or anyone else interest on this as a proposal (+1/0/-1) > > > before I go make the changes just to avoid churn at code review time. > > > > > > 1. Change send to use java.util.concurrent.Future in send(): > > > Future<RecordPosition> send(ProducerRecord record, Callback callback) > > > The cancel method will always return false and not do anything. Callback > > > will then be changed to > > > interface Callback { > > > public void onCompletion(RecordPosition position, Exception > > exception); > > > } > > > so that the exception is available there too and will be null if no error > > > occurred. I'm not planning on changing the name callback because I > > haven't > > > thought of a better one. > > > > > > 2. We will change the way serialization works to proposal 1A in the > > > previous discussion. That is the Partitioner and Serializer interfaces > > will > > > disappear. ProducerRecord will change to: > > > class ProducerRecord { > > > public byte[] key() {...} > > > public byte[] value() {...} > > > public Integer partition() {...} // can be null > > > } > > > So key and value are now byte[]; partitionKey will be replaced by an > > > optional partition. The behavior will be the following: > > > 1. If partition is present it will be used. > > > 2. If no partition is present but a key is present, a partition will be > > > chosen by a hash of the key > > > 3. If no key is present AND no partition id is present, partitions will > > be > > > chosen in a round robin fashion > > > In other words what is currently the DefaultPartitioner will now be hard > > > coded as the behavior whenever no partition is provided. > > > > > > In order to allow correctly choosing a partition the Producer interface > > > will include a new method: > > > List<PartitionInfo> partitionsForTopic(String topic); > > > PartitionInfo will be changed to include the actual Node objects not just > > > the Node ids. > > > > > > I think this will still make it possible to implement any partitioning > > > strategy you would want. The "challenge case" I considered was the > > > partitioner that minimizes the number of TCP connections. This > > partitioner > > > would chose the partition hosted on the node it had most recently chosen > > in > > > hopes that it would still have a connection to that node. > > > > > > It will be slightly more awkward to package partitioners in this model > > but > > > the simple cases are simpler so hopefully its worth it. > > > > > > 3. I will make the producer implement java.io.Closable but not throw any > > > exceptions as there doesn't really seem to be any disadvantage to this > > and > > > the interface may remind people to call close. > > > > > > 4. I am going to break the config stuff into a separate discussion as I > > > don't think I have done enough to document and name the configs well and > > I > > > need to do a pass on that first. > > > > > > 5. There were a number of comments about internals, mostly right on, > > which > > > I am going to handle separately. > > > > > > Non-changes > > > > > > 1. I don't plan to change the build system. The SBT=>gradle change is > > > basically orthoganol and we should debate it in the context of its > > ticket. > > > 2. I'm going to stick with my oddball kafka.* rather than > > > org.apache.kafka.* package name and non-getter methods unless everyone > > > complains. > > > 3. I'm not going to introduce a zookeeper dependency in the client as I > > > don't see any advantage. > > > 4. There were a number of reasonable suggestions on the Callback > > execution > > > model. I'm going to leave it as is, though. Because we are moving to use > > > Java Future we can't include functionality like ListenableFuture. I think > > > the simplestic callback model where callbacks are executed in the i/o > > > thread should be good enough for most cases and other cases can use their > > > own threading. > > > > >