Hey Jun,

Thanks much for the comments. Great point particularly regarding (3). I
haven't thought about this before.

It seems that there are two possible ways where the version number can be
used. One solution is for client to check the version number at the time it
receives MetadataResponse. And if the version number in the
MetadataResponse is smaller than the version number in the client's cache,
the client will be forced to fetch metadata again.  Another solution, as
you have suggested, is for broker to check the version number at the time
it receives a request from client. The broker will reject the request if
the version is smaller than the version in broker's cache.

I am not very sure that the second solution can address the problem here.
In the scenario described in the JIRA ticket, broker's cache may be
outdated because it has not processed the LeaderAndIsrRequest from the
controller. Thus it may still process client's request even if the version
in client's request is actually outdated. Does this make sense?

IMO, it seems that we can address problem (3) by saving the metadata
version together with the offset. After consumer starts, it will keep
fetching metadata until the metadata version >= the version saved with the
offset of this partition.

Regarding problems (1) and (2): Currently we use the version number in the
MetadataResponse to ensure that the metadata does not go back in time.
There are two alternative solutions to address problems (1) and (2). One
solution is for client to enumerate all partitions in the MetadataResponse,
compare their epoch with those in the cached metadata, and rejects the
MetadataResponse iff any leader epoch is smaller. The main concern is that
MetadataResponse currently cached information of all partitions in the
entire cluster. It may slow down client's performance if we were to do it.
The other solution is for client to enumerate partitions for only topics
registered in the org.apache.kafka.clients.Metadata, which will be an empty
set for producer and the set of subscribed partitions for consumer. But
this degrades to all topics if consumer subscribes to topics in the cluster
by pattern.

Note that client will only be forced to update metadata if the version in
the MetadataResponse is smaller than the version in the cached metadata. In
general it should not be a problem. It can be a problem only if some broker
is particularly slower than other brokers in processing
UpdateMetadataRequest. When this is the case, it means that the broker is
also particularly slower in processing LeaderAndIsrRequest, which can cause
problem anyway because some partition will probably have no leader during
this period. I am not sure problems (1) and (2) cause more problem than
what we already have.

Thanks,
Dong


On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Great finding on the issue. It's a real problem. A few comments about the
> KIP. (1) I am not sure about updating controller_metadata_epoch on every
> UpdateMetadataRequest. Currently, the controller can send
> UpdateMetadataRequest when there is no actual metadata change. Doing this
> may require unnecessary metadata refresh on the client. (2)
> controller_metadata_epoch is global across all topics. This means that a
> client may be forced to update its metadata even when the metadata for the
> topics that it cares haven't changed. (3) It doesn't seem that the KIP
> handles the corner case when a consumer is restarted. Say a consumer reads
> from the new leader, commits the offset and then is restarted. On restart,
> the consumer gets an outdated metadata and fetches from the old leader.
> Then, the consumer will get into the offset out of range issue.
>
> Given the above, I am thinking of the following approach. We actually
> already have metadata versioning at the partition level. Each leader has a
> leader epoch which is monotonically increasing. We can potentially
> propagate leader epoch back in the metadata response and the clients can
> cache that. This solves the issue of (1) and (2). To solve (3), when saving
> an offset, we could save both an offset and the corresponding leader epoch.
> When fetching the data, the consumer provides both the offset and the
> leader epoch. A leader will only serve the request if its leader epoch is
> equal to or greater than the leader epoch from the consumer. To achieve
> this, we need to change the fetch request protocol and the offset commit
> api, which requires some more thoughts.
>
> Thanks,
>
> Jun
>
>
> On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Bump up the thread.
> >
> > It will be great to have more comments on whether we should do it or
> > whether there is better way to address the motivation of this KIP.
> >
> > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > I don't have an interesting rejected alternative solution to put in the
> > > KIP. If there is good alternative solution from anyone in this thread,
> I
> > am
> > > happy to discuss this and update the KIP accordingly.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> > >
> > >> It is clearer now.
> > >>
> > >> I noticed that Rejected Alternatives section is empty.
> > >> Have you considered any alternative ?
> > >>
> > >> Cheers
> > >>
> > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >>
> > >> > Ted, thanks for catching this. I have updated the sentence to make
> it
> > >> > readable.
> > >> >
> > >> > Thanks,
> > >> > Dong
> > >> >
> > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> > >> >
> > >> > > bq. It the controller_epoch of the incoming MetadataResponse, or
> if
> > >> the
> > >> > > controller_epoch is the same but the controller_metadata_epoch
> > >> > >
> > >> > > Can you update the above sentence so that the intention is
> clearer ?
> > >> > >
> > >> > > Thanks
> > >> > >
> > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > >> > >
> > >> > > > Hi all,
> > >> > > >
> > >> > > > I have created KIP-232: Detect outdated metadata by adding
> > >> > > > ControllerMetadataEpoch field:
> > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > > > 232%3A+Detect+outdated+metadata+by+adding+
> > >> > ControllerMetadataEpoch+field
> > >> > > > .
> > >> > > >
> > >> > > > The KIP proposes to add fields in MetadataResponse and
> > >> > > > UpdateMetadataRequest so that client can reject outdated
> metadata
> > >> and
> > >> > > avoid
> > >> > > > unnecessary OffsetOutOfRangeException. Otherwise there is
> > currently
> > >> > race
> > >> > > > condition that can cause consumer to reset offset which
> negatively
> > >> > affect
> > >> > > > the consumer's availability.
> > >> > > >
> > >> > > > Feedback and suggestions are welcome!
> > >> > > >
> > >> > > > Regards,
> > >> > > > Dong
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to