Hi Greg,

Thanks for the KIP. I have a few questions/comments:

1) You mentioned that during a rebalance if all the members of the cluster
support the static protocol, then it would use the steps outlined in the
Proposed Section to do the assignments. In those steps, the leader
identifies the static/wildcard jobs. It is not clear to me how the leader
makes that distinction? Are we going to enhance the embedded protocol to
also write the static jobs that the worker owns as part of it's assignment?
As of today, the workers just write only the owned/revoked connectors/tasks
in case of incremental and above and only owned connectors/tasks in case of
eager.

2) Could you also elaborate this statement a bit:

> A cluster with both static and wildcard workers can use wildcard workers
> as backups for disaster recovery.


I see in the Strimzi proposal you have explained this scenario (the case
where shared workers are empty as pods aren't created yet etc IIUC), but
reading the KIP doesn't make it too obvious.

3) A nit comment but we should probably call out that there is no need to
assign the connector and task to the same worker when configuring them. I
guess for new users of Connect this could be a source of confusion. WDYT?

4) Staying on the above, I also wanted to know why do we need to set
static.connectors? As such, connectors generally aren't resource intensive.
Can we not let connectors be assigned using the same algorithm as today and
let only tasks be pinned to workers?

5) In the proposed section you also mentioned that

If static assignments are not specified, or at least one worker in the
> cluster is not using the static  protocol, they are ignored and the
> worker may receive an arbitrary assignment.


The arbitrary assignment is driven by the minimum supported protocol right
(sessioned/eager)?

6) Just for my understanding, because now the onus is on the connect admin
to specify which connector/task should be running where, can there be a
situation where there could be an imbalance in terms of number of
connectors/tasks running on workers (due to some oversight on part of admin
or some bug in the tooling used to generate worker configs if any) ? There
could be an imbalance even today as well but the protocol tries to balance
as much as it can in an automated fashion.

7) Lastly, in a cluster where all workers support the static protocol,
there is still no guarantee that all connectors/tasks would be static. In
such a case, what happens when one of the workers is restarted or is being
rolled? Would the assignment eventually be sticky when the worker comes
back? Does this new protocol also abide by the scheduled rebalance delays?
Maybe this has been explained in the KIP but it wasn't clear to me when I
read it.

Thanks!
Sagar.

On Tue, Oct 10, 2023 at 11:22 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hi Mickael,
>
> I'm not Chris but I hope I can still respond to your questions :)
>
> 1a. This system behaves best when connectors and tasks are known in
> advance, but I don't think that is a requirement. I clarified that the
> worker configurations are permissive of non-existent jobs, which
> allows for bootstrapping an empty cluster and concurrent worker & job
> provisioning.
> 1b. I think the wildcard suggestion is similar to the rejected
> alternative "implement a more complex worker selector...". While we
> could add that sort of functionality, I think it is more difficult to
> express and reason about. For example, with the current configuration,
> users can ensure that at most one job is assigned to a JVM. If a user
> wanted to use wildcards with the same constraint, those wildcards
> would need to express "at most one of <matching job>".
> 1c. The javadoc doesn't say what happens to the additional tasks, but
> I think we could start enforcing it if it makes the
> fixed-static-assignment use-case more reliable. I have only ever seen
> additional tasks emitted due to rather serious bugs in connector
> implementations.
>
> 2. Yes, I've added that information to the proposal. I used the `GET
> /connectors/{connector}` API as it doesn't list the task configs, but
> it performs the same function.
>
> 3. Do you mean the resource limits and requests? This is the rejected
> alternative "Model per-job resource constraints and measure resource
> utilization within a single JVM", let me know if the reasoning there
> is not convincing.
>
> 4. I have not explored rack awareness for Connect in detail. I expect
> that rack awareness would require some compromise on the "least
> disruption" property of incremental cooperative rebalancing for shared
> JVMs. We could discuss that in a future KIP.
> As for this proposal, because the management layer is responsible for
> placing workers and placing tasks on those workers, it would also be
> responsible for implementing rack-awareness in isolated JVMs.
>
> Thanks a lot!
> Greg
>
> On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> wrote:
> >
> > Hey Tom,
> >
> > Thanks for your comments! I appreciate your insight here and on the
> > Strimzi proposal.
> >
> > 1. Fixed, thanks.
> >
> > 2. We haven't removed a protocol yet, so workers still announce that
> > they can use the old protocol versions, and the group coordinator will
> > choose the highest mutually supported protocol version. This would be
> > true even if static was not a superset of sessioned.
> > For the static feature, having workers in the cluster that do not
> > support static assignments will cause the common version to not
> > support static assignments, and will mean that workers which announce
> > static.connectors and static.tasks may be assigned jobs outside of
> > their static assignments, which is called out in the KIP.
> > If the version gap is wider, then the other protocol-controlled
> > features will also change behavior (i.e. no longer using
> > incremental-cooperative-assignments, or not including REST
> > authentication headers). There would be no difference to the behavior
> > of this feature.
> >
> > 3. Arbitrary here is specifically meant to say that one of the workers
> > will be chosen, but outside implementations should not rely on the
> > method of choosing. This allows us to change/improve the assignment
> > algorithm in the future without a KIP. The reason I've specified it
> > this way is because I'd like to choose the "least loaded" worker, but
> > I don't know if that is well-defined or tractable to compute. I expect
> > that in practice we will use some heuristics to tie-break, but won't
> > be able to ensure that the algorithm will be deterministic and/or
> > optimal. If we find an improvement to the heuristics, I think that we
> > should be able to implement them without a KIP.
> >
> > 4. I added a section for "Choosing static assignments" that details
> > both the "dynamic" and "fixed" use-cases. With regards to tasks.max:
> > The javadoc for the `taskConfigs` method does state that it
> > "produc[es] at most {@code maxTasks} configurations," so static
> > inference of the task IDs is valid. I think that statically inferring
> > the number of tasks will always come with the trade-off that you can
> > over-provision task workers, it seems fundamental. I think it's a
> > trade-off that users of this feature will make for themselves, and
> > both methods will be supported.
> >
> > 5. No I don't think that it is a safe assumption, as there are
> > certainly connectors out there which violate it. For example, sink
> > connectors with a heterogeneous set of topics, and source connectors
> > which distribute a dynamic heterogeneous workload internally. I
> > believe there are a few connectors with a distinct "task-0" which
> > consumes more resources than the other tasks which would have some
> > index-stability. Index is chosen because it is the only distinguishing
> > feature we have without inspecting task configurations, and have
> > control over via coordination and assignments.
> >
> > Thanks again!
> > Greg
> >
> >
> > On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison <mickael.mai...@gmail.com>
> wrote:
> > >
> > > Hi Chris,
> > >
> > > Thanks for the KIP!
> > >
> > > 1) With this mechanism it seems Connect administrators have to know in
> > > advanced the names of connectors that will be running. As far as I can
> > > tell, static.connectors and static.tasks will require restarting the
> > > worker to change. Also you don't specify if these configurations can
> > > accept wildcards or not. This could be useful for tasks as it's hard
> > > to know in advance how many tasks will exist, especially as connectors
> > > can in theory ignore tasks.max (I wonder if we should make the runtime
> > > enforce that config).
> > >
> > > 2) The KIP does not explicitly mention a mechanism for management
> > > systems to figure out what to do. As far as I can tell (and looking at
> > > the proposal in Strimzi), these systems will have to poll the REST API
> > > to do so. If I understand correctly, management systems would first
> > > have to poll the GET /connectors and then GET /connectors/{name}/tasks
> > > for each connector, is that right?
> > >
> > > 3) In the Strimzi proposal you mention adding custom fields to the
> > > Connector resource to instruct it what to do. I wonder if these should
> > > be added directly in the connector configuration, rather than just
> > > exist in the management system. That way all the data necessary to
> > > make decisions would be available in the same place, the REST API.
> > >
> > > 4) This is not something you mention in the KIP and I don't think it's
> > > a blocker for this KIP but I wonder if you thought about rack
> > > awareness too. I'm bringing it as I think it's a similar concern when
> > > it comes to deciding where to assign tasks.
> > >
> > > Thanks,
> > > Mickael
> > >
> > >
> > > On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com>
> wrote:
> > > >
> > > > Hi Greg,
> > > >
> > > > Many thanks for the KIP. Here are a few initial questions
> > > >
> > > > 1. Incomplete sentence: "But Connect is not situated to be able to
> manage
> > > > resources directly, as workers are given a fixed "
> > > > 2. You explain how sessioned is now a subset of static, but what
> happens in
> > > > a cluster where some workers are using static and some are using
> either
> > > > eager or compatible?
> > > > 3. "Assign each unassigned static job to a static worker which
> specifies
> > > > that job, choosing arbitrarily if there are multiple valid workers."
> I
> > > > think there might be less ambiguous words than "arbitrary" to
> specify this
> > > > behaviour. Hashing the task name would _appear_ pretty arbitrary to
> the
> > > > user, but would be deterministic. Picking at random would be
> > > > non-deterministic. Even better if you have a rationale.
> > > > 4. You don't describe how a user, or automated system, starting with
> a set
> > > > of connectors, should find out the tasks that they want to run. This
> > > > relates to the contract of
> > > > org.apache.kafka.connect.connector.Connector#taskConfigs(int
> maxTasks).
> > > > AFAIK (please correct me if I'm wrong, because it's a long time
> since I
> > > > looked at this code) there's nothing that validates that the
> returned list
> > > > has at most the `maxTasks` and connectors can, of course, return
> fewer than
> > > > that many tasks. So without deeper knowledge of a particular
> connector it's
> > > > not clear to the user/operator how to configure their static workers
> and
> > > > static assignments.
> > > > 5. Is there a lurking assumption that task indices are stable? E.g.
> that
> > > > the task with index 3 will always be the resource-intensive one. I
> can see
> > > > that that would often be a reliable assumption, but it's not clear
> to me
> > > > that it is always the case.
> > > >
> > > > Thanks again,
> > > >
> > > > Tom
> > > >
> > > > On Fri, 6 Oct 2023 at 12:36, Greg Harris
> <greg.har...@aiven.io.invalid>
> > > > wrote:
> > > >
> > > > > Hey everyone!
> > > > >
> > > > > I'd like to propose an improvement to Kafka Connect's scheduling
> > > > > algorithm, with the goal of improving the operability of connect
> > > > > clusters through resource isolation.
> > > > >
> > > > > Find the KIP here:
> > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments
> > > > >
> > > > > This feature is primarily intended to be consumed by cluster
> > > > > management systems, so I've opened a sister proposal in the Strimzi
> > > > > project that uses this feature to provide user-facing resource
> > > > > isolation. The base feature is generic, so it is usable manually
> and
> > > > > with cluster management systems other than Strimzi.
> > > > >
> > > > > Find the proposal here:
> https://github.com/strimzi/proposals/pull/96
> > > > >
> > > > > Thanks!
> > > > > Greg Harris
> > > > >
> > > > >
>

Reply via email to