[ 
https://issues.apache.org/jira/browse/KUDU-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061980#comment-18061980
 ] 

Todd Lipcon commented on KUDU-3748:
-----------------------------------

Take it with a grain of salt since I'm no longer actively working on Kudu, but: 
the system I work on now (Spanner) has a "range deletion" feature and it's a 
source of extreme complexity that has gotten in the way of a ton of other 
proposed optimizations over the last decade. I'd be very cautious in adding 
this feature to Kudu.

> Support Delete by Primary Key Prefix (Key-Range Delete)
> -------------------------------------------------------
>
>                 Key: KUDU-3748
>                 URL: https://issues.apache.org/jira/browse/KUDU-3748
>             Project: Kudu
>          Issue Type: New Feature
>            Reporter: Joaquín Antonio De Vicente López
>            Priority: Major
>
> |Aspect|Detail|
> |Status|Proposed|
> |Component|Client, Tablet Server, Storage|
> |Affects|Java Client, C++ Tablet Server|
> *Motivation*
> *Real-World Use Case:* Spark Structured Streaming Ingestion with Calendar 
> Explosion
> We have an Apache Spark Structured Streaming job consuming messages from 
> Apache Kafka. Each message contains 2 business identifier fields and 4 
> additional fields, each being a list of calendars
> (valid_from, valid_to).
> During transformation the calendar lists are vertically exploded, generating 
> N rows per message where N = max(length(calendar_list_1 ... 
> calendar_list_4)). Shorter lists produce NULL values in their
> respective columns. A positional index column num_cal is generated and 
> included as part of the Primary Key in the Kudu table — without it, only the 
> last calendar row would survive an upsert.
> *The problem:* we need to perform a logical upsert (replace) per message 
> based on the 2 business identifiers. Since Kudu requires the full Primary Key 
> for delete operations, and our PK includes num_cal,
> we cannot directly delete all rows of a message unless we know every num_cal 
> value. Attempting a delete without num_cal causes the streaming job to fail:
> org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is 
> not set
> Caused by: java.lang.IllegalStateException: Primary key column num_cal is not 
> set
> Impact: the Structured Streaming job fails and aborts, message replacement 
> cannot be performed, and the continuous ingestion pipeline is disrupted.
> Today, we work around this limitation by joining each incoming micro-batch 
> with the target Kudu table in order to retrieve the existing full primary 
> keys (including {{{}num_cal{}}}) and issue row-level deletes before inserting 
> updated rows. This join is inherently inefficient: it requires scanning and 
> reading existing data solely to discover generated key components, increases 
> read I/O and latency, and couples ingestion performance to the size and 
> distribution of the target table. As the table grows, this approach scales 
> poorly and adds unnecessary overhead to what is logically a simple “replace 
> by business key” operation.
> *General Problem Statement*
> Kudu does not support deleting rows by a partial primary key (a leading 
> prefix of the PK columns). Many workloads need a "delete-and-replace" pattern 
> where all rows sharing a logical identifier are
> removed before inserting new versions. Today this requires the caller to know 
> every full PK value, which is impractical when the PK includes generated or 
> positional columns.
> *Proposed Solution:* Three-Phase Approach
> The feature is designed in three phases, each building on the previous:
> |Phase|Approach|Cost per Operation|
> |Phase 1|Client-side scan + delete|O(N) network round-trips|
> |Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads + 
> writes (server-side)|
> |Phase 3|Storage-level range tombstones|O(1) per tablet|
> *Phase 1:* Client-Side Scan + Delete
> *Approach*
> The Java client performs the entire operation:
> 1. Validate that the set columns in the supplied PartialRow are a contiguous 
> leading prefix of the table's primary key columns.
> 2. Open a scan with equality predicates on the prefix columns, projecting 
> only PK columns.
> 3. For every row returned by the scan, issue a DELETE through a KuduSession 
> (batched with MANUAL_FLUSH).
> 4. Return the total number of rows deleted.
> *Limitations*
>  - Network overhead: every matching row's PK is transferred from tablet 
> server to client and back as a delete operation.
>  - Latency: proportional to the number of matching rows.
>  - No server-side optimization: the tablet server is unaware of the 
> higher-level intent.
> *Phase 2:* Server-Side RPC
> Approach
> A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The 
> client sends one RPC per relevant tablet; the server performs the scan and 
> delete internally, eliminating network round-trips
> for individual rows.
> *RPC Definition*
> {code:java}
> rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
> returns (DeleteByPkPrefixResponsePB){
>   option (kudu.rpc.track_rpc_result) = true;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixRequestPB {  
>   required bytes tablet_id = 1;
>   // Schema of the prefix columns.
>   optional SchemaPB schema = 2;
>   // Encoded prefix row.
>   optional RowOperationsPB pk_prefix = 3;
>   optional ExternalConsistencyMode external_consistency_mode = 4
>   [default = CLIENT_PROPAGATED];
>   optional fixed64 propagated_timestamp = 5;
>   optional security.SignedTokenPB authz_token = 6;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixResponsePB {
>   optional TabletServerErrorPB error = 1;
>   optional int64 rows_deleted = 2;
>   optional fixed64 timestamp = 3;
>   optional ResourceMetricsPB resource_metrics = 4;
> }{code}
> *Server-Side Behavior*
> 1. Decode the prefix row from the request.
> 2. Build equality predicates for each prefix column.
> 3. Create an MVCC snapshot iterator projecting only PK columns.
> 4. Scan matching rows in batches and submit DELETE_IGNORE operations through 
> Raft.
> 5. Return the total count of deleted rows.
> *Client-Side Behavior*
>  - Use partition pruning to determine which tablets may contain matching 
> rows, avoiding RPCs to irrelevant tablets.
>  - Send one DeleteByPkPrefixRequest per relevant tablet.
>  - Sum rows_deleted from all responses.
>  - If any tablet returns a feature-not-supported error (mixed-version 
> cluster), fall back to Phase 1.
> *Public API*
> DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);
> Semantics
>  - Per-tablet snapshot consistency: rows to delete are determined by an MVCC 
> snapshot taken at the start of the operation on each tablet. Rows inserted 
> after the snapshot are NOT deleted.
>  - Per-tablet non-atomicity: deletes are applied in batches through Raft. 
> Each batch is individually atomic; the full operation within a tablet is NOT 
> a single atomic transaction.
>  - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
>  - Partial failures: if one or more tablets fail, the client returns an error 
> and the operation may have been partially applied on other tablets. Retrying 
> is safe due to track_rpc_result=true.
>  - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of 
> the same rows are harmless.
>  - Retry safety: track_rpc_result = true enables server-side deduplication.
> *Limitations of Phase 2*
>  - Still scans every matching row: the server must read each row's PK from 
> storage and issue individual delete mutations through Raft.
>  - Cost proportional to N (matching rows): for tables with millions of rows 
> per prefix, this is expensive in CPU, I/O, and WAL.
>  - WAL amplification: each deleted row generates a Raft log entry.
> *Phase 3:* Storage-Level Range Tombstones
> *Motivation*
> Phases 1 and 2 solve the functional problem but remain O(N) in the number of 
> matching rows: every row must be scanned and individually deleted. For tables 
> with millions of rows per prefix, this is
> expensive.
> Range tombstones eliminate row enumeration at delete time, while maintaining 
> O(log K) lookup cost during reads. A single metadata entry (lower_key, 
> upper_key, mvcc_timestamp) logically deletes all rows in the key range 
> without touching them individually.
> Phase 3 reduces write amplification by replacing N row-level delete mutations 
> with a single replicated range tombstone operation per tablet.
> Range tombstones do not provide cross-tablet atomicity.
> Atomic multi-tablet semantics require Kudu transactions.
> *Proposed Semantics*
> MVCC model:
>  - A range tombstone is created at a single MVCC hybrid timestamp T.
>  - Any row version with commit_timestamp <= T whose encoded primary key
> falls within [lower_key, upper_key) is considered deleted for snapshots
> at or after T.
>  - Snapshots taken before T are not affected and continue to observe the row 
> version.
>  - Rows inserted after timestamp T are visible (not deleted), unless covered 
> by a later tombstone.
>  - This is consistent with Kudu's existing MVCC visibility rules.
> Replication:
>  - Range tombstone operations are Raft-replicated like regular writes.
>  - Idempotent under retry via {{track_rpc_result=true}} server-side request 
> deduplication.
> Interaction with row-level operations:
>  - A row-level insert or upsert at timestamp T2 > T produces a new version 
> that is visible despite the tombstone, following standard MVCC visibility 
> rules.
>  - A row-level delete at any timestamp is independent; both can coexist.
> *Proposed Storage Design*
> New data structure: a range tombstone store — an interval set associated with 
> each RowSet (or at the tablet level for MemRowSet).
> Each entry:
> {lower_key, upper_key, mvcc_timestamp}
> Persistence options (to be evaluated during design):
> |Option|Description|Pros|Cons|
> |A|Separate tombstone file per RowSet|Clean separation|New file format|
> |B|DeltaStore extension|Reuses infra|Poor key-fit|
> |C|Tablet metadata|Simple|Metadata bloat|
> Recommended approach: Option A — a separate tombstone store per RowSet, 
> persisted as a small file with a sorted list of entries. An in-memory 
> interval tree or sorted list enables efficient lookup.
> *Read Path Impact*
> When producing a row during a scan or point lookup:
> 1. Retrieve the row's encoded PK and commit timestamp.
> 2. Query the range tombstone store for any tombstone covering that PK.
> 3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and 
> row_commit_time <= T, the row is filtered out.
> 4. Lookup must be O(log K) where K = number of tombstones, using binary 
> search or an interval tree.
> Compaction Impact
> During rowset merge/compaction:
>  - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are 
> physically dropped.
>  - Tombstones are retained as long as needed for MVCC snapshot reads, CDC 
> consumers, and replication lag (following existing history retention rules / 
> --tablet_history_max_age_sec). Tombstones must not be garbage-collected until 
> no active snapshot can observe rows covered by the tombstone.
>  - Overlapping tombstones with compatible timestamps can be merged into a 
> single wider range.
> Tablet Split Impact
> When a tablet splits at a split key S:
>  - Tombstone (lower, upper, T) is split into:
>  - Child A: (lower, min(upper, S), T) — if lower < S
>  - Child B: (max(lower, S), upper, T) — if upper > S
>  - Empty ranges after clipping are discarded.
> API / RPC Changes
> Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone 
> deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature 
> flags.
> Phase 2 and Phase 3 are conceptually distinct:
>  * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level 
> deletes.
>  * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone 
> over an encoded key interval.
> The two approaches are independent and can coexist. Phase 3 does not modify 
> the semantics of Phase 2.
> {code:java}
> rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
>     returns (DeleteByKeyRangeResponsePB) {
>   option (kudu.rpc.track_rpc_result) = true;
> }
> message DeleteByKeyRangeRequestPB {
>   // Target tablet.
>   required bytes tablet_id = 1;
>   // Inclusive lower bound of the encoded primary key.
>   required bytes lower_encoded_key = 2;
>   // Exclusive upper bound of the encoded primary key.
>   required bytes upper_encoded_key = 3;
>   // Optional propagated timestamp for external consistency.
>   optional fixed64 propagated_timestamp = 4;
>   // Authorization token.
>   optional security.SignedTokenPB authz_token = 5;
> }
> message DeleteByKeyRangeResponsePB {   
>    optional TabletServerErrorPB error = 1;
>    // Hybrid timestamp at which the tombstone was applied.
>    optional fixed64 timestamp = 2;
>    optional ResourceMetricsPB resource_metrics = 3; } {code}
> Key Range Computation from PK Prefix
> For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
>  - lower_key = encode(prefix + MIN_SUFFIX)
>  - upper_key = encode(prefix) with next lexicographical successor
> This must correctly handle:
>  - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
>  - Composite PKs with mixed types.
>  - Per-tablet partition boundaries: the effective range is the intersection 
> of the prefix range and the tablet's key range.
> *Testing Plan*
> Unit tests:
>  - Range tombstone containment logic (inclusive/exclusive boundaries, edge 
> cases).
>  - Overlapping tombstones with different timestamps.
>  - MVCC visibility: rows inserted before vs. after tombstone timestamp.
>  - Tombstone splitting during tablet split.
> Integration tests:
> 1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
> 2. Insert many rows for a given prefix.
> 3. Issue delete-by-prefix with range tombstone.
> 4. Verify scan no longer returns deleted rows without individual row-level 
> deletes being issued.
> 5. Insert new rows after tombstone timestamp — verify they are visible.
> 6. Trigger compaction — verify on-disk row count decreases.
> 7. Verify behavior across leader changes.
> Performance benchmark (optional):
>  - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K, 
> 100K, 1M matching rows.
>  - Measure: wall time, WAL bytes written, I/O.
> *Limitations and Future Work*
>  - CDC: range tombstones may need special handling in the Change Data Capture 
> stream. Initially, they could be expanded into individual delete events at 
> CDC read time.
>  - Backup/Restore: backup tooling must preserve range tombstones.
>  - Tombstone accumulation: without compaction, many small tombstones could 
> degrade read performance. A background task to merge adjacent/overlapping 
> tombstones may be needed.
>  - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if 
> needed.
>  - Delete by PK subset: a natural extension would be to support deleting by 
> an arbitrary subset of PK columns (e.g., col1 and col3 without specifying 
> col2). This is feasible at the scan + delete level
> (Phases 1 and 2) since equality predicates can be applied on any column 
> combination. However, it has significant implications:
>  -- Performance: prefix deletes map to a contiguous key range (efficient 
> range scan). Subset deletes produce scattered matches across the key space, 
> requiring broader scans.
>  -- Partition pruning: less effective when intermediate PK columns are 
> unspecified.
>  -- Range tombstones (Phase 3): not directly applicable. A subset does not 
> map to a single [lower, upper) key range, so the O(1) tombstone approach 
> would not apply.
> Deferred as a separate feature once the prefix-based implementation is stable.
> *Summary*
> |Aspect|Phase 1|Phase 2|Phase 3|Future Work|
> |Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
> |Where|Client|Tablet Server|Storage engine|Client/Server|
> |Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded 
> PK interval)|Scan + arbitrary predicates|
> |Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
> |WAL writes|N|N|1 per tablet|N|
> |Row enumeration|Yes|Yes|No|Yes|
> |Compaction|Standard|Standard|Drops covered rows|Standard|
> |Range tombstones|No|No|Yes|No|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to