Hey Tom,

Thanks for your reply. Here are my thoughts:

1) I think the DescribeDirsResponse can be used by AdminClient to query the
lag of follower replica as well. Here is how it works:

- AdminClient sends DescribeDirsRequest to both the leader and the follower
of the partition.
- DescribeDirsResponse from both leader and follower shows the LEO of the
leader replica and the follower replica.
- Lag of the follower replica is the difference in the LEO between leader
and follower.

In comparison to ReplicaStatusRequest, DescribeDirsRequest needs to be send
to each replica of the partition whereas ReplicaStatusRequest only needs to
be sent to the leader of the partition. It doesn't seem to make much
difference though. In practice we probably want to query the replica lag of
many partitions and AminClient needs to send exactly one request to each
broker with either solution. Does this make sense?


2) KIP-179 proposes to add the following AdminClient API:

alterTopics(Collection<AlteredTopic> alteredTopics, AlterTopicsOptions
options)

Where AlteredTopic includes the following fields

AlteredTopic(String name, int numPartitions, int replicationFactor,
Map<Integer,List<Integer>> replicasAssignment)

I have two comments on this:

- The information in AlteredTopic seems a bit redundant. Both numPartitions
and replicationFactor can be derived from replicasAssignment. I think we
can probably just use the following API in AdminClient instead:

AlterTopicsResult alterTopics(Map<TopicPartition, List<Integer>>
partitionAssignment, AlterTopicsOptions options)

- Do you think "reassignPartitions" may be a better name than
"alterTopics"? This is more consistent with the existing name used in
kafka-reassign-partitions.sh and I also find it to be more accurate. On the
other hand, alterTopics seems to suggest that we can also alter topic
config.



Thanks,
Dong

On Fri, Aug 4, 2017 at 1:03 AM, Tom Bentley <t.j.bent...@gmail.com> wrote:

> Hi Dong,
>
> Thanks for your reply.
>
> You're right that your DescribeDirsResponse contains appropriate data. The
> comment about the log_end_offset in the KIP says "Enable user to track
> movement progress by comparing LEO of the *.log and *.move". That makes me
> wonder whether this would only work for replica movement on the same
> broker. In the case of reassigning partitions between brokers it's only
> really the leader for the partition that knows the max LEO of the ISR and
> the LEO of the target broker. Maybe the comment is misleading and it would
> be the leader doing the same thing in both cases. Can you clarify this?


> At a conceptual level there's a lot of similarity between KIP-113 and
> KIP-179 because they're both about moving replicas around. Concretely
> though, ChangeReplicaDirRequest assumes the movement is on the same broker,
> while the equivalent APIs in KIP-179 (whichever alternative) lack the
> notion of disks. I wonder if a unified API would be better or not.
> Operationally, the reasons for changing replicas between brokers are likely
> to be motivated by balancing load across the cluster, or adding/removing
> brokers. But the JBOD use case has different motivations. So I suspect it's
> OK that the APIs are separate, but I'd love to hear what others think.


> I think you're right about TopicPartitionReplica. At the protocol level we
> could group topics and partitions together so avoid having the same topic
> name multiple times when querying for the status of all the partitions of a
> topic.
>
> Thanks again for taking the time to respond.
>
> Tom
>
> On 3 August 2017 at 23:07, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Tom,
> >
> > Thanks for the KIP. It seems that the prior discussion in this thread has
> > focused on reassigning partitions (or AlterTopics). I haven't looked into
> > this yet. I have two comments on the replicaStatus() API and the
> > ReplicaStatusRequest:
> >
> > -  It seems that the use-case for ReplicaStatusRequest is covered by
> > the DescribeDirsRequest introduced in KIP-113
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 113%3A+Support+replicas+movement+between+log+directories>.
> > DescribeDirsResponse contains the logEndOffset for the specified
> > partitions. The the lag of the follower replica can be derived as the
> > difference between leader's LEO and follower's LEO. Do you think we can
> > simply use DescribeDirsRequest without introducing ReplicaStatusRequest?
> >
> > - According to the current KIP, replicaStatus() takes a list of
> partitions
> > as input. And its output maps the partition to a list of replica status.
> Do
> > you think it would be better to have a new class called
> > TopicPartitionReplica, which identifies the topic, partition and the
> > brokerId. replicaStatus can then take a list of TopicPartitionReplica as
> > input. And its output maps the replica to replica status. The latter API
> > seems simpler and also matches the method name better. What do you think?
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Wed, Aug 2, 2017 at 5:12 AM, Tom Bentley <t.j.bent...@gmail.com>
> wrote:
> >
> > > Hi again Ismael,
> > >
> > >
> > > 1. It's worth emphasising that reassigning partitions is a different
> > > >> process than what happens when a topic is created, so not sure
> trying
> > to
> > > >> make it symmetric is beneficial. In addition to what was already
> > > >> discussed,
> > > >> one should also enable replication throttling before moving the
> data.
> > > >
> > > >
> > > > Thanks for your responses.The only advantage I can see from having
> > > > separate APIs for partition count change vs. the other changes is
> that
> > we
> > > > expect the former to return synchronously, and the latter to
> (usually)
> > > > return asynchronously. Lumping them into the same API forces clients
> of
> > > > that API to code for the two possible cases. Is this the particular
> > > concern
> > > > you had in mind, or do you have others?
> > > >
> > >
> > > Just to help see what this looks like I've created another version of
> > > KIP-179 (
> > > https://cwiki.apache.org/confluence/display/KAFKA/Copy+
> > > of+KIP-179+-+Change+ReassignPartitionsCommand+to+use+AdminClient
> > > ).
> > >
> > > The AdminClient APIs are more type safe (harder to misuse). The
> > > asynchronous nature of the alterReplicationFactors() and
> > > reassignPartitions() methods aren't really apparent (you'd still have
> to
> > > read the javadocs to know you had to call replicaStatus() to determine
> > > completion), but it's still better that the lumped-together API because
> > the
> > > methods are _consistently_ async. The Protocol APIs are slightly nicer
> > too,
> > > in that INVALID_REQUEST is less commonly returned. Another benefit is
> it
> > > would allow for these different alteration APIs to be modified
> > > independently in the future, should that necessary.
> > >
> > > What do others think?
> > >
> > > You're right that the proposal doesn't cover setting and clearing a
> > > > throttle, and it should. I will study the code about how this works
> > > before
> > > > posting back about that.
> > > >
> > >
> > > AFAICS from the code the throttle is simply a dynamic config of the
> > broker.
> > > We already have the AdminClient.alterConfigs API for setting this, so
> the
> > > refactoring of kafka-reassign-partitions.sh could just use that
> existing
> > > API. We could still achieve your objective of setting the throttle
> before
> > > reassignment by making the --throttle option mandatory when --execute
> was
> > > present. Or did you have something else in mind?
> > >
> > > Thanks,
> > >
> > > Tom
> > >
> >
>

Reply via email to