My concern is more with the partitioner that determines the partition of the message. IIRC, it does something like "hash(key) mod #partitions" in the normal case, which means if the # of partitions changes because some of them are offline, then certain messages will be sent to the wrong (online) partition, no?
Evan On Sun, Mar 1, 2015 at 9:36 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Evan, > > In the java producer, partition id of the message is determined in the > send() call and then the data is appended to the corresponding batch buffer > (one buffer for each partition), i.e. the partition id will never change > once it is decided. If the partition becomes offline after this, the send > call will fail and then retry. In the end the message will either exhaust > all retries and be dropped on the floor or the partition becomes online > again and the metadata gets refreshed, and message send retry successfully. > Hence, if some of your partitions becomes offline for too long some data > will be lost on the producer side (unless you set infinite retry, of > course). > > Guozhang > > On Sun, Mar 1, 2015 at 5:25 AM, Evan Huus <evan.h...@shopify.com> wrote: > > > On Sun, Mar 1, 2015 at 1:46 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi Honghai, > > > > > > 1. If a partition has no leader (i.e. all of its replicas are down) it > > will > > > become offline, and hence the metadata response will not have this > > > partition's info. > > > > > > > If I am understanding this correctly, then this is a problem (in certain > > cases) for the producer because of an ambiguity. > > > > If a producer using hash-partitioning receives partitions 1-4 and begins > > producing, then updates its metadata and receives only partitions 1-2, > > there are two possible scenarios it cannot distinguish between: > > > > 1. The user reduced the number of partitions for this topic. The producer > > should continue producing and distribute all new messages between the two > > remaining partitions. > > 2. Two of the four partitions are entirely offline. The producer should > > continue to distribute messages among all four partitions (to maintain > the > > consistency of the hashing) but two of the four partitions will simply > > fail. > > > > Whichever behaviour the producer chooses, in the other scenario it will > > incorrectly distribute messages among the partitions, thus breaking the > > hash-partitioner guarantee. > > > > If all of the replicas are down for a partition, why do metadata requests > > not simply return that partition with LeaderNotAvailable? > > > > Thanks, > > Evan > > > > > > > 2. Any of the brokers cache metadata and hence can handle the metadata > > > request. It's just that their cache are updated asynchronously and > hence > > > when there is a update to the metadata, some brokers may got the new > > > metadata value a bit eariler than others. > > > > > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai < > > waldenchenka...@outlook.com> > > > wrote: > > > > > > > We have one topic with 4 partitions, but sometimes only get metadata > > of 2 > > > > partitions, did anyone meet this kind of situation before?If some > > > partition > > > > has no leader at that moment, will it cause this problem? How to > > make > > > > some partition has no leader?If 6 brokers has some partitions of the > > > topic, > > > > will they return same result? Do I need try all of them and merge > the > > > > result? > > > > > > > > SimpleConsumer consumer = consumerPool.getConsumer(seed.host, > > > > seed.port, connectionTimeOut, consumerBufferSize, > > > "refreshPartitionMeta"); > > > > List<String> topics = new ArrayList<String>() {{ > > > > add(topic); }}; > > TopicMetadataResponse > > > > resp = consumer.send(new TopicMetadataRequest(topics)); > > > > List<TopicMetadata> metaData = resp.topicsMetadata(); > > > > for (TopicMetadata item : metaData) { > > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError()) > > > > LOG.error(String.format("Something wrong with topic metadata > > for > > > > topic: %s error code: %d ", item.topic(), item.errorCode() )); > > > > for (PartitionMetadata part : item.partitionsMetadata()) { > > > > partitionMeta.put(part.partitionId(), part); > > > > } } > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >