Filed as https://issues.apache.org/jira/browse/KAFKA-1998

Evan

On Mon, Mar 2, 2015 at 5:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> That is a valid point, today the returned metadata response already
> contains partitions even with error code, so we can expose that in the
> Cluster / KafkaProducer class. Could you file a JIRA?
>
> Guozhang
>
> On Sun, Mar 1, 2015 at 7:11 PM, Evan Huus <evan.h...@shopify.com> wrote:
>
> > Which I think is my point - based on my current understanding, there is
> > *no* way to find out the total number of partitions for a topic besides
> > hard-coding it or manually reading it from zookeeper. The kafka metadata
> > API does not reliably expose that information.
> >
> > Evan
> >
> > On Sun, Mar 1, 2015 at 10:07 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > I see.
> > >
> > > If you need to make sure messages are going to the same partition
> during
> > > broker bouncing / failures, then you should not depend on the
> partitioner
> > > to decide the partition id but explicitly set it before calling send().
> > > For example, you can use the total number of partitions for the topic,
> > not
> > > the number of available partitions to compute partition id.
> > >
> > > Guozhang
> > >
> > > On Sun, Mar 1, 2015 at 6:42 PM, Evan Huus <evan.h...@shopify.com>
> wrote:
> > >
> > > > My concern is more with the partitioner that determines the partition
> > of
> > > > the message. IIRC, it does something like "hash(key) mod #partitions"
> > in
> > > > the normal case, which means if the # of partitions changes because
> > some
> > > of
> > > > them are offline, then certain messages will be sent to the wrong
> > > (online)
> > > > partition, no?
> > > >
> > > > Evan
> > > >
> > > > On Sun, Mar 1, 2015 at 9:36 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Evan,
> > > > >
> > > > > In the java producer, partition id of the message is determined in
> > the
> > > > > send() call and then the data is appended to the corresponding
> batch
> > > > buffer
> > > > > (one buffer for each partition), i.e. the partition id will never
> > > change
> > > > > once it is decided. If the partition becomes offline after this,
> the
> > > send
> > > > > call will fail and then retry. In the end the message will either
> > > exhaust
> > > > > all retries and be dropped on the floor or the partition becomes
> > online
> > > > > again and the metadata gets refreshed, and message send retry
> > > > successfully.
> > > > > Hence, if some of your partitions becomes offline for too long some
> > > data
> > > > > will be lost on the producer side (unless you set infinite retry,
> of
> > > > > course).
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Mar 1, 2015 at 5:25 AM, Evan Huus <evan.h...@shopify.com>
> > > wrote:
> > > > >
> > > > > > On Sun, Mar 1, 2015 at 1:46 AM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Honghai,
> > > > > > >
> > > > > > > 1. If a partition has no leader (i.e. all of its replicas are
> > down)
> > > > it
> > > > > > will
> > > > > > > become offline, and hence the metadata response will not have
> > this
> > > > > > > partition's info.
> > > > > > >
> > > > > >
> > > > > > If I am understanding this correctly, then this is a problem (in
> > > > certain
> > > > > > cases) for the producer because of an ambiguity.
> > > > > >
> > > > > > If a producer using hash-partitioning receives partitions 1-4 and
> > > > begins
> > > > > > producing, then updates its metadata and receives only partitions
> > > 1-2,
> > > > > > there are two possible scenarios it cannot distinguish between:
> > > > > >
> > > > > > 1. The user reduced the number of partitions for this topic. The
> > > > producer
> > > > > > should continue producing and distribute all new messages between
> > the
> > > > two
> > > > > > remaining partitions.
> > > > > > 2. Two of the four partitions are entirely offline. The producer
> > > should
> > > > > > continue to distribute messages among all four partitions (to
> > > maintain
> > > > > the
> > > > > > consistency of the hashing) but two of the four partitions will
> > > simply
> > > > > > fail.
> > > > > >
> > > > > > Whichever behaviour the producer chooses, in the other scenario
> it
> > > will
> > > > > > incorrectly distribute messages among the partitions, thus
> breaking
> > > the
> > > > > > hash-partitioner guarantee.
> > > > > >
> > > > > > If all of the replicas are down for a partition, why do metadata
> > > > requests
> > > > > > not simply return that partition with LeaderNotAvailable?
> > > > > >
> > > > > > Thanks,
> > > > > > Evan
> > > > > >
> > > > > >
> > > > > > > 2. Any of the brokers cache metadata and hence can handle the
> > > > metadata
> > > > > > > request. It's just that their cache are updated asynchronously
> > and
> > > > > hence
> > > > > > > when there is a update to the metadata, some brokers may got
> the
> > > new
> > > > > > > metadata value a bit eariler than others.
> > > > > > >
> > > > > > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai <
> > > > > > waldenchenka...@outlook.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > We have one topic with 4 partitions, but sometimes only get
> > > > metadata
> > > > > > of 2
> > > > > > > > partitions, did anyone meet this kind of situation before?If
> > some
> > > > > > > partition
> > > > > > > > has no leader at that moment, will it cause this problem?
> >  How
> > > > to
> > > > > > make
> > > > > > > > some partition has no leader?If 6 brokers has some partitions
> > of
> > > > the
> > > > > > > topic,
> > > > > > > > will they return same result?  Do I need try all of them and
> > > merge
> > > > > the
> > > > > > > > result?
> > > > > > > >
> > > > > > > >      SimpleConsumer consumer =
> > > consumerPool.getConsumer(seed.host,
> > > > > > > > seed.port, connectionTimeOut, consumerBufferSize,
> > > > > > > "refreshPartitionMeta");
> > > > > > > >               List<String> topics = new ArrayList<String>()
> {{
> > > > > > > >       add(topic);                }};
> > > > > > TopicMetadataResponse
> > > > > > > > resp = consumer.send(new TopicMetadataRequest(topics));
> > > > > > > > List<TopicMetadata> metaData = resp.topicsMetadata();
> > > > > > > >                 for (TopicMetadata item : metaData) {
> > > > > > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError())
> > > > > > > >         LOG.error(String.format("Something wrong with topic
> > > > metadata
> > > > > > for
> > > > > > > > topic: %s error code: %d ", item.topic(), item.errorCode()
> ));
> > > > > > > >       for (PartitionMetadata part :
> item.partitionsMetadata())
> > {
> > > > > > > >             partitionMeta.put(part.partitionId(), part);
> > > > > > > >                      }                }
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to