kbuci commented on code in PR #12856: URL: https://github.com/apache/hudi/pull/12856#discussion_r2023879870
########## rfc/rfc-79/rfc-90.md: ########## @@ -0,0 +1,310 @@ +w<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE +file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License +at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an " +AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> + +# Add support for cancellable clustering table service plans + +## Proposers + +Krishen Bhan (kbuci) + +## Approvers + +Sivabalan Narayanan (nsivabalan) + +## Status + +In Progress + +JIRA: HUDI-7946 + +## Abstract + +Clustering is a table service useed to optimize table/files layout in HUDI in order to speed up read queries. Currently +ingestion writers will abort if they attempt to write to the same data targetted by a pending clustering write. +As a result, clustering table service plans can indirectly delay ingestion writes from updating a dataset with recent data. +Furthermore, a clustering plan that isn't executed to completion for a large amount of time (due to repeated failures, application +misconfiguration, or insufficient resources) will degrade the read/write performance of a dataset due to delaying clean, +archival, and metadata table compaction. This is because currently HUDI clustering plans, upon being scheduled, must be +executed to completion. This RFC proposes to support "Cancellable" Clustering plans. Support for such cancellable clustering plans +will provide HUDI an avenue to fully cancel a clustering plan and allow other table service and ingestion writers to proceed and avoiding +starvation based on user needs. + +## Background + +### Current state of Execution of table service operations in Hudi + +As of now, the table service operations `COMPACT` and `CLUSTER` are implicitly "immutable" plans by default, meaning +that once a plan is scheduled, it will stay as a pending instant until a caller invokes the table service execute API on +the table service instant and successfully completes it (referred to as "executing" a table service). Specifically, if an inflight +execution fails after transitioning the instant to inflight, the next execution attempt will implictly create and execute a rollback +plan (which will delete all new instant/data files), but will keep the table service plan. And then the table service will be +re-attempted. This process will repeat until the instant is completed. The below visualization captures these transitions at a high level + + + +## Goals + +### (A) An ingestion job should be able to cancel and ignore any inflight cancellable clustering instants targeting the same data as the ingestion writer. + +The current requirement of HUDI needing to execute a clustering plan to completion forces ingestion writers to abort a +commit if a conflicting table service plan is present. Becuase an ingestion writer typically determines the exact file groups it +will be updating/replacing after building a workload profile and performing record tagging, the writer may have already +spent a lot of time and resources before realizing that it needs to abort. In the face of frequent table service plans +or an old inflight plan, this will cause delays in adding recent upstream records to the dataset as well as +unnecessairly take away resources from other applications in the data lake (such as Spark executors in the case of the Spark engine). +Making the clustering plan cancellable should avoid this situation by permitting an ingestion writer to request all +conflicting cancellable clustering plans to be "cancelled" and ignore inflight plans that already have been requested +for cancellation. The latter will ensure that ingestion writers can ignore any incomplete cancellable clustering instants that have been requested +for cancellation but have not yet been aborted. + +### (B) A cancellable table service plan should be eligible for cancellation at any point before committing + +In conjunction with (A), any caller (ingestion writer and potentially other users) should be able to request cancellation for an inflight +cancellable clustering plan. We should not need any synchronous mechanism where in the clustering plan of interest +should be aborted and cleaned up completely before which the ingestion writer can proceeed. We should have a light weight mechanism with +which the ingestion writer make a cancellation request and moves on to carry out its operation with the assumption that +the respective clustering plan will be aborted. This requirement is needed due to presence of concurrent and async +writers for clustering execution, as another worker should not need to wait (for the respective concurrent clustering +worker to proceed with execution or fail) before confirming that its cancellation request will be honored. Once the +request for cancellation succeeds, all interested entities like the ingestion writer, reader, asynchronous clustering +execution job should assume the clustering plan is cancelled. + +## Design + +### Enabling a clustering plan to be cancellable + +To satisfy goal (A), a new config flag named "cancellable" can be added to a clustering plan. A writer that intends to +schedule a cancellable table service plan, can enable the flag in the serialized plan metadata. Any writer executing the +plan can infer that the plan is cancellable, and when trying to commit the instant should abort, if it detects that is +has been requested for cancellation. As a future optimization, the cancellable clustering worker can continually poll +during its execution to see if it has been requested for cancellation. On the other side, with the ingestion writer +flow, the commit finalization logic for ingestion writers can be updated to ignore any inflight clustering plans if they +are cancellable. For the purpose of this design proposal, consider the existing ingestion write flow as having three +steps: + +1. Schedule itself on the timeline with a new instant time in a .requested file +2. Process/record tag incoming records, build a workload profile, and write the updating/replaced file groups to a "inflight" + instant file on the timeline. Check for conflicts and abort if needed. +4. Perform write conflict checks and commit the instant on the timeline + +The aforementioned changes to ingestion and clustering flow will ensure that in the event of a conflicting ingestion and +cancellable table service writer, the ingestion job will take precedence (and cause the cancellable table service +instant to eventually cancel) as long as a cancellable clustering plan hasn't be completed before (2). Since if the +cancellable table service has already been completed before (2), the ingestion job will see that a completed instant (a +cancellable table service action) conflicts with its ongoing inflight write, and therefore it would not be legal to +proceed. On such cases, ingestion writer will have to abort itself instead of proceeding to completion. + +### Adding a cancel action and aborted state for cancellable plans + +This proposed design will also involve adding a new instant state and internal hoodie metadata directory, by making the +following changes: + +#### Cancel action + +* We are proposing to add a new .hoodie/.cancel folder, where each file corresponds to an instant time that a writer Review Comment: Just to clarify, do you mean adding a .canceled state, so that an instant can transition from instant.replacecommit.requested -> instant.inflight -> instant.canceled -> instant.abort ? We could implement this by doing it that way as well. But the reason why I made cancellation involve a folder instant of a new state, is that - Looking at how heartbeating and markers were implemented, it seemed to be a convention to add a new folder for activity involving tracking the state of inflight instants - I thought implementation would be simpler if we only had to add one more new state , instead of two. but if you think adding a new state would make design fit better with rest of HUDI operations, then I'm fine with updating the RFC accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org