I see.

If you need to make sure messages are going to the same partition during
broker bouncing / failures, then you should not depend on the partitioner
to decide the partition id but explicitly set it before calling send().
For example, you can use the total number of partitions for the topic, not
the number of available partitions to compute partition id.

Guozhang

On Sun, Mar 1, 2015 at 6:42 PM, Evan Huus <evan.h...@shopify.com> wrote:

> My concern is more with the partitioner that determines the partition of
> the message. IIRC, it does something like "hash(key) mod #partitions" in
> the normal case, which means if the # of partitions changes because some of
> them are offline, then certain messages will be sent to the wrong (online)
> partition, no?
>
> Evan
>
> On Sun, Mar 1, 2015 at 9:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Evan,
> >
> > In the java producer, partition id of the message is determined in the
> > send() call and then the data is appended to the corresponding batch
> buffer
> > (one buffer for each partition), i.e. the partition id will never change
> > once it is decided. If the partition becomes offline after this, the send
> > call will fail and then retry. In the end the message will either exhaust
> > all retries and be dropped on the floor or the partition becomes online
> > again and the metadata gets refreshed, and message send retry
> successfully.
> > Hence, if some of your partitions becomes offline for too long some data
> > will be lost on the producer side (unless you set infinite retry, of
> > course).
> >
> > Guozhang
> >
> > On Sun, Mar 1, 2015 at 5:25 AM, Evan Huus <evan.h...@shopify.com> wrote:
> >
> > > On Sun, Mar 1, 2015 at 1:46 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hi Honghai,
> > > >
> > > > 1. If a partition has no leader (i.e. all of its replicas are down)
> it
> > > will
> > > > become offline, and hence the metadata response will not have this
> > > > partition's info.
> > > >
> > >
> > > If I am understanding this correctly, then this is a problem (in
> certain
> > > cases) for the producer because of an ambiguity.
> > >
> > > If a producer using hash-partitioning receives partitions 1-4 and
> begins
> > > producing, then updates its metadata and receives only partitions 1-2,
> > > there are two possible scenarios it cannot distinguish between:
> > >
> > > 1. The user reduced the number of partitions for this topic. The
> producer
> > > should continue producing and distribute all new messages between the
> two
> > > remaining partitions.
> > > 2. Two of the four partitions are entirely offline. The producer should
> > > continue to distribute messages among all four partitions (to maintain
> > the
> > > consistency of the hashing) but two of the four partitions will simply
> > > fail.
> > >
> > > Whichever behaviour the producer chooses, in the other scenario it will
> > > incorrectly distribute messages among the partitions, thus breaking the
> > > hash-partitioner guarantee.
> > >
> > > If all of the replicas are down for a partition, why do metadata
> requests
> > > not simply return that partition with LeaderNotAvailable?
> > >
> > > Thanks,
> > > Evan
> > >
> > >
> > > > 2. Any of the brokers cache metadata and hence can handle the
> metadata
> > > > request. It's just that their cache are updated asynchronously and
> > hence
> > > > when there is a update to the metadata, some brokers may got the new
> > > > metadata value a bit eariler than others.
> > > >
> > > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai <
> > > waldenchenka...@outlook.com>
> > > > wrote:
> > > >
> > > > > We have one topic with 4 partitions, but sometimes only get
> metadata
> > > of 2
> > > > > partitions, did anyone meet this kind of situation before?If some
> > > > partition
> > > > > has no leader at that moment, will it cause this problem?     How
> to
> > > make
> > > > > some partition has no leader?If 6 brokers has some partitions of
> the
> > > > topic,
> > > > > will they return same result?  Do I need try all of them and merge
> > the
> > > > > result?
> > > > >
> > > > >      SimpleConsumer consumer = consumerPool.getConsumer(seed.host,
> > > > > seed.port, connectionTimeOut, consumerBufferSize,
> > > > "refreshPartitionMeta");
> > > > >               List<String> topics = new ArrayList<String>() {{
> > > > >       add(topic);                }};
> > > TopicMetadataResponse
> > > > > resp = consumer.send(new TopicMetadataRequest(topics));
> > > > > List<TopicMetadata> metaData = resp.topicsMetadata();
> > > > >                 for (TopicMetadata item : metaData) {
> > > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError())
> > > > >         LOG.error(String.format("Something wrong with topic
> metadata
> > > for
> > > > > topic: %s error code: %d ", item.topic(), item.errorCode() ));
> > > > >       for (PartitionMetadata part : item.partitionsMetadata()) {
> > > > >             partitionMeta.put(part.partitionId(), part);
> > > > >                      }                }
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to