It is useful to be able to autoscale the number of Pulsar Function workers
when configuring the cluster to run instances as threads (ThreadRuntime) in
an environment like K8s.  When scaling out the number of workers, the
"rebalance" endpoint can be involved after the scale out so that the new
workers can get a share of the work.  However, when scaling in, it is best
to reassign the instances the worker is running first before shutting down
the worker so that down time can be minimized.  If we don't perform a drain
before a shutdown the instances the worker is suppose to execute will
eventually be re-assigned after a timeout. However, the instances will not
be running during the timeout period and degradation in QOS is expected.
Thus, I would like to propose we add a new endpoint called "drain" on the
worker which will trigger all of the assignments for the worker to be
re-assigned.

The API for the drain operation should be asynchronous.  We need an API
endpoint for triggering the drain as well as another endpoint for checking
the status.

Trigger Drain (from worker)

Invocation of this API causes the worker that the call is sent to to be
drained.

PUT “/admin/v2/worker/drain”

Response Codes

Code

Description

Response Body

200

Trigger drain successful

none

409

Drain already in progress

“Drain already in progress”

403

The requester doesn't have permissions

N/A

503

Worker service is not ready

Function worker service is not done initializing. Please try again in a
little while


Trigger Drain (from leader)

PUT “/admin/v2/worker/leader/drain?workerId=<WORKER_ID>”

Query parameters

workerId - the id of the worker

Response Codes

Code

Description

Response Body

200

Trigger drain successful

none

409

Drain already in progress

“Drain already in progress”

403

The requester doesn't have permissions

N/A

503

Worker service is not ready

Function worker service is not done initializing. Please try again in a
little while



Drain Status (from worker)

GET “/admin/v2/worker/drain”

Response Codes

Code

Description

Response Body

200

Get drain status successful

Running, Not Running, Success, or Error (we could use the existing
LongRunningProcessStatus class for the response)

403

The requester doesn't have permissions

N/A

503

Worker service is not ready

Function worker service is not done initializing. Please try again in a
little while

Drain Status (from leader)

GET “/admin/v2/worker/leader/drain?workerId=<WORKER_ID>”

Query parameters

workerId - the id of the worker

Response Codes

Code

Description

Response Body

200

Get drain status successful

Running, Not Running, Success, or Error (we could use the existing
LongRunningProcessStatus class for the response)

403

The requester doesn't have permissions

N/A

503

Worker service is not ready

Function worker service is not done initializing. Please try again in a
little while

Workflow

There are two ways to drain a worker:

   1.

   Call the drain endpoint on the worker
   2.

   Call the drain endpoint on the leader and instruct the leader to drain a
   provided worker


When the drain endpoint on a worker is called, the call should be forwarded
to the leader since the leader takes care of the scheduling.  This is the
same for checking the status of the drain.

When the leader receives a request to drain a worker, it must first mark
the worker as in the process to be drained i.e. blacklist the worker so
that no new assignments can be assigned to it. We can perhaps just save the
blacklist in memory. The worker should then create a new scheduling in
which the assignments of the worker to be drained are moved to other
workers perhaps in a round robin distribution.  Afterwards, the leader
should mark the drain of the worker to be complete.

There are some caveats to this approach.  If the leader fails before
completing the drain request.  The drain request will not be fulfilled.
However, if the client frequently checks the status of the drain, it should
notice that the drain is not running and can re-submit a request.


What do people think?


Best,


Jerry

Reply via email to