> 1. If a partition has no leader (i.e. all of its replicas are down) it will
> become offline, and hence the metadata response will not have this
> partition's info.

If I am understanding this correctly, then this is a problem (in certain
cases) for the producer because of an ambiguity.

If a producer using hash-partitioning receives partitions 1-4 and begins
producing, then updates its metadata and receives only partitions 1-2,
there are two possible scenarios it cannot distinguish between:

1. The user reduced the number of partitions for this topic. The producer
should continue producing and distribute all new messages between the two
remaining partitions.
2. Two of the four partitions are entirely offline. The producer should
continue to distribute messages among all four partitions (to maintain the
consistency of the hashing) but two of the four partitions will simply fail.

Whichever behaviour the producer chooses, in the other scenario it will
incorrectly distribute messages among the partitions, thus breaking the
hash-partitioner guarantee.

If all of the replicas are down for a partition, why do metadata requests
not simply return that partition with LeaderNotAvailable?


> 2. Any of the brokers cache metadata and hence can handle the metadata
> request. It's just that their cache are updated asynchronously and hence
> when there is a update to the metadata, some brokers may got the new
> metadata value a bit eariler than others.
> > We have one topic with 4 partitions, but sometimes only get metadata of 2
> > partitions, did anyone meet this kind of situation before?If some
> partition
> > has no leader at that moment, will it cause this problem?     How to make
> > some partition has no leader?If 6 brokers has some partitions of the
> topic,
> > will they return same result?  Do I need try all of them and merge the
> > result?
> >      SimpleConsumer consumer = consumerPool.getConsumer(seed.host,
> > seed.port, connectionTimeOut, consumerBufferSize,
> "refreshPartitionMeta");
> >               List<String> topics = new ArrayList<String>() {{
> >       add(topic);                }};                TopicMetadataResponse
> > resp = consumer.send(new TopicMetadataRequest(topics));
> > List<TopicMetadata> metaData = resp.topicsMetadata();
> >                 for (TopicMetadata item : metaData) {
> > if(item.errorCode() != kafka.common.ErrorMapping.NoError())
> >         LOG.error(String.format("Something wrong with topic metadata for
> > topic: %s error code: %d ", item.topic(), item.errorCode() ));
> >       for (PartitionMetadata part : item.partitionsMetadata()) {
> >             partitionMeta.put(part.partitionId(), part);
> >                      }                }
