For (3) we could also do the following:
- On any retryable producer error, force a metadata refresh (in
  handleProducerResponse).
- In handleMetadataResponse, the producer can (internally) close out
  connections that are no longer valid. (i.e., connections to {old set
  of leader brokers} - {new set of leader brokers})

A potential drawback of using a TTL on unused connections is that
during events such as a broker bounce several producers across the
organization could maintain such connections to the bounced broker. So
there would be a transient period of higher than necessary
connections.

On Mon, Feb 03, 2014 at 11:45:41AM -0800, Jay Kreps wrote:
> 1. Yes, I think the name could be improved. However that name doesn't
> really express what it does. What about RecordSentCallback?
> 2. As Neha said the nodes are initialized to a random position. Round robin
> is preferable to random (lower variance, cheaper to compute, etc). Your
> point about skipping non-available nodes is a good one, I will add that.
> 3. I think just implementing the connection timeout we previously described
> (where after some period of disuse we close a connection) is good enough.
> This would mean that connections don't last longer than that timeout.
> 
> -Jay
> 
> On Mon, Feb 3, 2014 at 10:45 AM, Jun Rao <jun...@gmail.com> wrote:
> 
> > Fine with most of the changes. Just a few questions/comments.
> >
> > 1. Would it be better to change Callback to ProducerCallback to distinguish
> > it from controller callback and potential future other types of callbacks
> > (e.g. consumer)?
> >
> > 2. If no key is present AND no partition id is present, partitions
> > will be chosen
> > in a round robin fashion.
> >
> > Currently, our default event handler picks a random and available
> > partition. This is probably better than round robin because (1) if the
> > replication factor is 1 and there is a broker failure, we can still route
> > the message, (2) if a bunch of producers are started at the same time, this
> > prevents them from picking up the same partition in a synchronized way.
> >
> > 3. I think this will still make it possible to implement any
> > partitioning strategy
> > you would want. The "challenge case" I considered was the partitioner that
> > minimizes the number of TCP connections. This artitioner
> > would chose the partition hosted on the node it had most recently
> > chosen in hopes
> > that it would still have a connection to that node.
> >
> > To support this mode, we probably need a method in the producer to close
> > all existing sockets.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Fri, Jan 31, 2014 at 3:04 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > Hey folks,
> > >
> > > Thanks for all the excellent suggestions on the producer API, I think
> > this
> > > really made things better. We'll do a similar thing for the consumer as
> > we
> > > get a proposal together. I wanted to summarize everything I heard and the
> > > proposed changes I plan to do versus ignore. I'd like to get feedback
> > from
> > > the committers or anyone else interest on this as a proposal (+1/0/-1)
> > > before I go make the changes just to avoid churn at code review time.
> > >
> > > 1. Change send to use java.util.concurrent.Future in send():
> > >   Future<RecordPosition> send(ProducerRecord record, Callback callback)
> > > The cancel method will always return false and not do anything. Callback
> > > will then be changed to
> > >   interface Callback {
> > >     public void onCompletion(RecordPosition position, Exception
> > exception);
> > >   }
> > > so that the exception is available there too and will be null if no error
> > > occurred. I'm not planning on changing the name callback because I
> > haven't
> > > thought of a better one.
> > >
> > > 2. We will change the way serialization works to proposal 1A in the
> > > previous discussion. That is the Partitioner and Serializer interfaces
> > will
> > > disappear. ProducerRecord will change to:
> > >   class ProducerRecord {
> > >     public byte[] key() {...}
> > >     public byte[] value() {...}
> > >     public Integer partition() {...} // can be null
> > >   }
> > > So key and value are now byte[]; partitionKey will be replaced by an
> > > optional partition. The behavior will be the following:
> > > 1. If partition is present it will be used.
> > > 2. If no partition is present but a key is present, a partition will be
> > > chosen by a hash of the key
> > > 3. If no key is present AND no partition id is present, partitions will
> > be
> > > chosen in a round robin fashion
> > > In other words what is currently the DefaultPartitioner will now be hard
> > > coded as the behavior whenever no partition is provided.
> > >
> > > In order to allow correctly choosing a partition the Producer interface
> > > will include a new method:
> > >   List<PartitionInfo> partitionsForTopic(String topic);
> > > PartitionInfo will be changed to include the actual Node objects not just
> > > the Node ids.
> > >
> > > I think this will still make it possible to implement any partitioning
> > > strategy you would want. The "challenge case" I considered was the
> > > partitioner that minimizes the number of TCP connections. This
> > partitioner
> > > would chose the partition hosted on the node it had most recently chosen
> > in
> > > hopes that it would still have a connection to that node.
> > >
> > > It will be slightly more awkward to package partitioners in this model
> > but
> > > the simple cases are simpler so hopefully its worth it.
> > >
> > > 3. I will make the producer implement java.io.Closable but not throw any
> > > exceptions as there doesn't really seem to be any disadvantage to this
> > and
> > > the interface may remind people to call close.
> > >
> > > 4. I am going to break the config stuff into a separate discussion as I
> > > don't think I have done enough to document and name the configs well and
> > I
> > > need to do a pass on that first.
> > >
> > > 5. There were a number of comments about internals, mostly right on,
> > which
> > > I am going to handle separately.
> > >
> > > Non-changes
> > >
> > > 1. I don't plan to change the build system. The SBT=>gradle change is
> > > basically orthoganol and we should debate it in the context of its
> > ticket.
> > > 2. I'm going to stick with my oddball kafka.* rather than
> > > org.apache.kafka.* package name and non-getter methods unless everyone
> > > complains.
> > > 3. I'm not going to introduce a zookeeper dependency in the client as I
> > > don't see any advantage.
> > > 4. There were a number of reasonable suggestions on the Callback
> > execution
> > > model. I'm going to leave it as is, though. Because we are moving to use
> > > Java Future we can't include functionality like ListenableFuture. I think
> > > the simplestic callback model where callbacks are executed in the i/o
> > > thread should be good enough for most cases and other cases can use their
> > > own threading.
> > >
> >

Reply via email to