It is useful to be able to autoscale the number of Pulsar Function workers when configuring the cluster to run instances as threads (ThreadRuntime) in an environment like K8s. When scaling out the number of workers, the "rebalance" endpoint can be involved after the scale out so that the new workers can get a share of the work. However, when scaling in, it is best to reassign the instances the worker is running first before shutting down the worker so that down time can be minimized. If we don't perform a drain before a shutdown the instances the worker is suppose to execute will eventually be re-assigned after a timeout. However, the instances will not be running during the timeout period and degradation in QOS is expected. Thus, I would like to propose we add a new endpoint called "drain" on the worker which will trigger all of the assignments for the worker to be re-assigned.
The API for the drain operation should be asynchronous. We need an API endpoint for triggering the drain as well as another endpoint for checking the status. Trigger Drain (from worker) Invocation of this API causes the worker that the call is sent to to be drained. PUT “/admin/v2/worker/drain” Response Codes Code Description Response Body 200 Trigger drain successful none 409 Drain already in progress “Drain already in progress” 403 The requester doesn't have permissions N/A 503 Worker service is not ready Function worker service is not done initializing. Please try again in a little while Trigger Drain (from leader) PUT “/admin/v2/worker/leader/drain?workerId=<WORKER_ID>” Query parameters workerId - the id of the worker Response Codes Code Description Response Body 200 Trigger drain successful none 409 Drain already in progress “Drain already in progress” 403 The requester doesn't have permissions N/A 503 Worker service is not ready Function worker service is not done initializing. Please try again in a little while Drain Status (from worker) GET “/admin/v2/worker/drain” Response Codes Code Description Response Body 200 Get drain status successful Running, Not Running, Success, or Error (we could use the existing LongRunningProcessStatus class for the response) 403 The requester doesn't have permissions N/A 503 Worker service is not ready Function worker service is not done initializing. Please try again in a little while Drain Status (from leader) GET “/admin/v2/worker/leader/drain?workerId=<WORKER_ID>” Query parameters workerId - the id of the worker Response Codes Code Description Response Body 200 Get drain status successful Running, Not Running, Success, or Error (we could use the existing LongRunningProcessStatus class for the response) 403 The requester doesn't have permissions N/A 503 Worker service is not ready Function worker service is not done initializing. Please try again in a little while Workflow There are two ways to drain a worker: 1. Call the drain endpoint on the worker 2. Call the drain endpoint on the leader and instruct the leader to drain a provided worker When the drain endpoint on a worker is called, the call should be forwarded to the leader since the leader takes care of the scheduling. This is the same for checking the status of the drain. When the leader receives a request to drain a worker, it must first mark the worker as in the process to be drained i.e. blacklist the worker so that no new assignments can be assigned to it. We can perhaps just save the blacklist in memory. The worker should then create a new scheduling in which the assignments of the worker to be drained are moved to other workers perhaps in a round robin distribution. Afterwards, the leader should mark the drain of the worker to be complete. There are some caveats to this approach. If the leader fails before completing the drain request. The drain request will not be fulfilled. However, if the client frequently checks the status of the drain, it should notice that the drain is not running and can re-submit a request. What do people think? Best, Jerry