Filed as https://issues.apache.org/jira/browse/KAFKA-1998
Evan On Mon, Mar 2, 2015 at 5:19 PM, Guozhang Wang <wangg...@gmail.com> wrote: > That is a valid point, today the returned metadata response already > contains partitions even with error code, so we can expose that in the > Cluster / KafkaProducer class. Could you file a JIRA? > > Guozhang > > On Sun, Mar 1, 2015 at 7:11 PM, Evan Huus <evan.h...@shopify.com> wrote: > > > Which I think is my point - based on my current understanding, there is > > *no* way to find out the total number of partitions for a topic besides > > hard-coding it or manually reading it from zookeeper. The kafka metadata > > API does not reliably expose that information. > > > > Evan > > > > On Sun, Mar 1, 2015 at 10:07 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I see. > > > > > > If you need to make sure messages are going to the same partition > during > > > broker bouncing / failures, then you should not depend on the > partitioner > > > to decide the partition id but explicitly set it before calling send(). > > > For example, you can use the total number of partitions for the topic, > > not > > > the number of available partitions to compute partition id. > > > > > > Guozhang > > > > > > On Sun, Mar 1, 2015 at 6:42 PM, Evan Huus <evan.h...@shopify.com> > wrote: > > > > > > > My concern is more with the partitioner that determines the partition > > of > > > > the message. IIRC, it does something like "hash(key) mod #partitions" > > in > > > > the normal case, which means if the # of partitions changes because > > some > > > of > > > > them are offline, then certain messages will be sent to the wrong > > > (online) > > > > partition, no? > > > > > > > > Evan > > > > > > > > On Sun, Mar 1, 2015 at 9:36 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Evan, > > > > > > > > > > In the java producer, partition id of the message is determined in > > the > > > > > send() call and then the data is appended to the corresponding > batch > > > > buffer > > > > > (one buffer for each partition), i.e. the partition id will never > > > change > > > > > once it is decided. If the partition becomes offline after this, > the > > > send > > > > > call will fail and then retry. In the end the message will either > > > exhaust > > > > > all retries and be dropped on the floor or the partition becomes > > online > > > > > again and the metadata gets refreshed, and message send retry > > > > successfully. > > > > > Hence, if some of your partitions becomes offline for too long some > > > data > > > > > will be lost on the producer side (unless you set infinite retry, > of > > > > > course). > > > > > > > > > > Guozhang > > > > > > > > > > On Sun, Mar 1, 2015 at 5:25 AM, Evan Huus <evan.h...@shopify.com> > > > wrote: > > > > > > > > > > > On Sun, Mar 1, 2015 at 1:46 AM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Honghai, > > > > > > > > > > > > > > 1. If a partition has no leader (i.e. all of its replicas are > > down) > > > > it > > > > > > will > > > > > > > become offline, and hence the metadata response will not have > > this > > > > > > > partition's info. > > > > > > > > > > > > > > > > > > > If I am understanding this correctly, then this is a problem (in > > > > certain > > > > > > cases) for the producer because of an ambiguity. > > > > > > > > > > > > If a producer using hash-partitioning receives partitions 1-4 and > > > > begins > > > > > > producing, then updates its metadata and receives only partitions > > > 1-2, > > > > > > there are two possible scenarios it cannot distinguish between: > > > > > > > > > > > > 1. The user reduced the number of partitions for this topic. The > > > > producer > > > > > > should continue producing and distribute all new messages between > > the > > > > two > > > > > > remaining partitions. > > > > > > 2. Two of the four partitions are entirely offline. The producer > > > should > > > > > > continue to distribute messages among all four partitions (to > > > maintain > > > > > the > > > > > > consistency of the hashing) but two of the four partitions will > > > simply > > > > > > fail. > > > > > > > > > > > > Whichever behaviour the producer chooses, in the other scenario > it > > > will > > > > > > incorrectly distribute messages among the partitions, thus > breaking > > > the > > > > > > hash-partitioner guarantee. > > > > > > > > > > > > If all of the replicas are down for a partition, why do metadata > > > > requests > > > > > > not simply return that partition with LeaderNotAvailable? > > > > > > > > > > > > Thanks, > > > > > > Evan > > > > > > > > > > > > > > > > > > > 2. Any of the brokers cache metadata and hence can handle the > > > > metadata > > > > > > > request. It's just that their cache are updated asynchronously > > and > > > > > hence > > > > > > > when there is a update to the metadata, some brokers may got > the > > > new > > > > > > > metadata value a bit eariler than others. > > > > > > > > > > > > > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai < > > > > > > waldenchenka...@outlook.com> > > > > > > > wrote: > > > > > > > > > > > > > > > We have one topic with 4 partitions, but sometimes only get > > > > metadata > > > > > > of 2 > > > > > > > > partitions, did anyone meet this kind of situation before?If > > some > > > > > > > partition > > > > > > > > has no leader at that moment, will it cause this problem? > > How > > > > to > > > > > > make > > > > > > > > some partition has no leader?If 6 brokers has some partitions > > of > > > > the > > > > > > > topic, > > > > > > > > will they return same result? Do I need try all of them and > > > merge > > > > > the > > > > > > > > result? > > > > > > > > > > > > > > > > SimpleConsumer consumer = > > > consumerPool.getConsumer(seed.host, > > > > > > > > seed.port, connectionTimeOut, consumerBufferSize, > > > > > > > "refreshPartitionMeta"); > > > > > > > > List<String> topics = new ArrayList<String>() > {{ > > > > > > > > add(topic); }}; > > > > > > TopicMetadataResponse > > > > > > > > resp = consumer.send(new TopicMetadataRequest(topics)); > > > > > > > > List<TopicMetadata> metaData = resp.topicsMetadata(); > > > > > > > > for (TopicMetadata item : metaData) { > > > > > > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError()) > > > > > > > > LOG.error(String.format("Something wrong with topic > > > > metadata > > > > > > for > > > > > > > > topic: %s error code: %d ", item.topic(), item.errorCode() > )); > > > > > > > > for (PartitionMetadata part : > item.partitionsMetadata()) > > { > > > > > > > > partitionMeta.put(part.partitionId(), part); > > > > > > > > } } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >