[
https://issues.apache.org/jira/browse/KUDU-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joaquín Antonio De Vicente López updated KUDU-3748:
---------------------------------------------------
Summary: Support Delete by Primary Key Prefix (Key-Range Delete) (was:
Support Delete by Primary Key Prefix (Range Delete))
> Support Delete by Primary Key Prefix (Key-Range Delete)
> -------------------------------------------------------
>
> Key: KUDU-3748
> URL: https://issues.apache.org/jira/browse/KUDU-3748
> Project: Kudu
> Issue Type: New Feature
> Reporter: Joaquín Antonio De Vicente López
> Priority: Major
>
> |Aspect|Detail|
> |Status|Proposed|
> |Component|Client, Tablet Server, Storage|
> |Affects|Java Client, C++ Tablet Server|
> *Motivation*
> Real-World Use Case: Spark Structured Streaming Ingestion with Calendar
> Explosion
> We have an Apache Spark Structured Streaming job consuming messages from
> Apache Kafka. Each message contains 2 business identifier fields and 4
> additional fields, each being a list of calendars
> (valid_from, valid_to).
> During transformation the calendar lists are vertically exploded, generating
> N rows per message where N = max(length(calendar_list_1 ...
> calendar_list_4)). Shorter lists produce NULL values in their
> respective columns. A positional index column num_cal is generated and
> included as part of the Primary Key in the Kudu table — without it, only the
> last calendar row would survive an upsert.
> The problem: we need to perform a logical upsert (replace) per message based
> on the 2 business identifiers. Since Kudu requires the full Primary Key for
> delete operations, and our PK includes num_cal,
> we cannot directly delete all rows of a message unless we know every num_cal
> value. Attempting a delete without num_cal causes the streaming job to fail:
> org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is
> not set
> Caused by: java.lang.IllegalStateException: Primary key column num_cal is not
> set
> Impact: the Structured Streaming job fails and aborts, message replacement
> cannot be performed, and the continuous ingestion pipeline is disrupted.
> General Problem Statement
> Kudu does not support deleting rows by a partial primary key (a leading
> prefix of the PK columns). Many workloads need a "delete-and-replace" pattern
> where all rows sharing a logical identifier are
> removed before inserting new versions. Today this requires the caller to know
> every full PK value, which is impractical when the PK includes generated or
> positional columns.
> *Proposed Solution:* Three-Phase Approach
> The feature is designed in three phases, each building on the previous:
> |Phase|Approach|Cost per Operation|
> |Phase 1|Client-side scan + delete|O(N) network round-trips|
> |Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads +
> writes (server-side)|
> |Phase 3|Storage-level range tombstones|O(1) per tablet|
> *Phase 1:* Client-Side Scan + Delete
> Approach
> The Java client performs the entire operation:
> 1. Validate that the set columns in the supplied PartialRow are a contiguous
> leading prefix of the table's primary key columns.
> 2. Open a scan with equality predicates on the prefix columns, projecting
> only PK columns.
> 3. For every row returned by the scan, issue a DELETE through a KuduSession
> (batched with MANUAL_FLUSH).
> 4. Return the total number of rows deleted.
> Limitations
> - Network overhead: every matching row's PK is transferred from tablet
> server to client and back as a delete operation.
> - Latency: proportional to the number of matching rows.
> - No server-side optimization: the tablet server is unaware of the
> higher-level intent.
> *Phase 2:* Server-Side RPC
> Approach
> A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The
> client sends one RPC per relevant tablet; the server performs the scan and
> delete internally, eliminating network round-trips
> for individual rows.
> *RPC Definition*
> {code:java}
> rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
> returns (DeleteByPkPrefixResponsePB){
> option (kudu.rpc.track_rpc_result) = true;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixRequestPB {
> required bytes tablet_id = 1;
> // Schema of the prefix columns.
> optional SchemaPB schema = 2;
> // Encoded prefix row.
> optional RowOperationsPB pk_prefix = 3;
> optional ExternalConsistencyMode external_consistency_mode = 4
> [default = CLIENT_PROPAGATED];
> optional fixed64 propagated_timestamp = 5;
> optional security.SignedTokenPB authz_token = 6;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixResponsePB {
> optional TabletServerErrorPB error = 1;
> optional int64 rows_deleted = 2;
> optional fixed64 timestamp = 3;
> optional ResourceMetricsPB resource_metrics = 4;
> }{code}
> Server-Side Behavior
> 1. Decode the prefix row from the request.
> 2. Build equality predicates for each prefix column.
> 3. Create an MVCC snapshot iterator projecting only PK columns.
> 4. Scan matching rows in batches and submit DELETE_IGNORE operations through
> Raft.
> 5. Return the total count of deleted rows.
> Client-Side Behavior
> - Use partition pruning to determine which tablets may contain matching
> rows, avoiding RPCs to irrelevant tablets.
> - Send one DeleteByPkPrefixRequest per relevant tablet.
> - Sum rows_deleted from all responses.
> - If any tablet returns a feature-not-supported error (mixed-version
> cluster), fall back to Phase 1.
> Public API
> DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);
> Semantics
> - Per-tablet snapshot consistency: rows to delete are determined by an MVCC
> snapshot taken at the start of the operation on each tablet. Rows inserted
> after the snapshot are NOT deleted.
> - Per-tablet non-atomicity: deletes are applied in batches through Raft.
> Each batch is individually atomic; the full operation within a tablet is NOT
> a single atomic transaction.
> - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
> - Partial failures: if one or more tablets fail, the client returns an error
> and the operation may have been partially applied on other tablets. Retrying
> is safe due to track_rpc_result=true.
> - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of
> the same rows are harmless.
> - Retry safety: track_rpc_result = true enables server-side deduplication.
> Limitations of Phase 2
> - Still scans every matching row: the server must read each row's PK from
> storage and issue individual delete mutations through Raft.
> - Cost proportional to N (matching rows): for tables with millions of rows
> per prefix, this is expensive in CPU, I/O, and WAL.
> - WAL amplification: each deleted row generates a Raft log entry.
> *Phase 3:* Storage-Level Range Tombstones
> Motivation
> Phases 1 and 2 solve the functional problem but remain O(N) in the number of
> matching rows: every row must be scanned and individually deleted. For tables
> with millions of rows per prefix, this is
> expensive.
> Range tombstones eliminate row enumeration at delete time, while maintaining
> O(log K) lookup cost during reads. A single metadata entry (lower_key,
> upper_key, mvcc_timestamp) logically deletes all rows in the key range
> without touching them individually.
> Phase 3 reduces write amplification by replacing N row-level delete mutations
> with a single replicated range tombstone operation per tablet.
> Range tombstones do not provide cross-tablet atomicity.
> Atomic multi-tablet semantics require Kudu transactions.
> Proposed Semantics
> MVCC model:
> - A range tombstone is created at a single MVCC hybrid timestamp T.
> - Any row version with commit_timestamp <= T whose encoded primary key
> falls within [lower_key, upper_key) is considered deleted for snapshots
> at or after T.
> - Snapshots taken before T are not affected and continue to observe the row
> version.
> - Rows inserted after timestamp T are visible (not deleted), unless covered
> by a later tombstone.
> - This is consistent with Kudu's existing MVCC visibility rules.
> Replication:
> - Range tombstone operations are Raft-replicated like regular writes.
> - Idempotent under retry via {{track_rpc_result=true}} server-side request
> deduplication.
> Interaction with row-level operations:
> - A row-level insert or upsert at timestamp T2 > T produces a new version
> that is visible despite the tombstone, following standard MVCC visibility
> rules.
> - A row-level delete at any timestamp is independent; both can coexist.
> Proposed Storage Design
> New data structure: a range tombstone store — an interval set associated with
> each RowSet (or at the tablet level for MemRowSet).
> Each entry:
> {lower_key, upper_key, mvcc_timestamp}
> Persistence options (to be evaluated during design):
> |Option|Description|Pros|Cons|
> |A|Separate tombstone file per RowSet|Clean separation|New file format|
> |B|DeltaStore extension|Reuses infra|Poor key-fit|
> |C|Tablet metadata|Simple|Metadata bloat|
> Recommended approach: Option A — a separate tombstone store per RowSet,
> persisted as a small file with a sorted list of entries. An in-memory
> interval tree or sorted list enables efficient lookup.
> Read Path Impact
> When producing a row during a scan or point lookup:
> 1. Retrieve the row's encoded PK and commit timestamp.
> 2. Query the range tombstone store for any tombstone covering that PK.
> 3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and
> row_commit_time <= T, the row is filtered out.
> 4. Lookup must be O(log K) where K = number of tombstones, using binary
> search or an interval tree.
> Compaction Impact
> During rowset merge/compaction:
> - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are
> physically dropped.
> - Tombstones are retained as long as needed for MVCC snapshot reads, CDC
> consumers, and replication lag (following existing history retention rules /
> --tablet_history_max_age_sec). Tombstones must not be garbage-collected until
> no active snapshot can observe rows covered by the tombstone.
> - Overlapping tombstones with compatible timestamps can be merged into a
> single wider range.
> Tablet Split Impact
> When a tablet splits at a split key S:
> - Tombstone (lower, upper, T) is split into:
> - Child A: (lower, min(upper, S), T) — if lower < S
> - Child B: (max(lower, S), upper, T) — if upper > S
> - Empty ranges after clipping are discarded.
> API / RPC Changes
> Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone
> deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature
> flags.
> Phase 2 and Phase 3 are conceptually distinct:
> * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level
> deletes.
> * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone
> over an encoded key interval.
> The two approaches are independent and can coexist. Phase 3 does not modify
> the semantics of Phase 2.
> {code:java}
> rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
> returns (DeleteByKeyRangeResponsePB) {
> option (kudu.rpc.track_rpc_result) = true;
> }
> message DeleteByKeyRangeRequestPB {
> // Target tablet.
> required bytes tablet_id = 1;
> // Inclusive lower bound of the encoded primary key.
> required bytes lower_encoded_key = 2;
> // Exclusive upper bound of the encoded primary key.
> required bytes upper_encoded_key = 3;
> // Optional propagated timestamp for external consistency.
> optional fixed64 propagated_timestamp = 4;
> // Authorization token.
> optional security.SignedTokenPB authz_token = 5;
> }
> message DeleteByKeyRangeResponsePB {
> optional TabletServerErrorPB error = 1;
> // Hybrid timestamp at which the tombstone was applied.
> optional fixed64 timestamp = 2;
> optional ResourceMetricsPB resource_metrics = 3; } {code}
> Key Range Computation from PK Prefix
> For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
> - lower_key = encode(prefix + MIN_SUFFIX)
> - upper_key = encode(prefix) with next lexicographical successor
> This must correctly handle:
> - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
> - Composite PKs with mixed types.
> - Per-tablet partition boundaries: the effective range is the intersection
> of the prefix range and the tablet's key range.
> Testing Plan
> Unit tests:
> - Range tombstone containment logic (inclusive/exclusive boundaries, edge
> cases).
> - Overlapping tombstones with different timestamps.
> - MVCC visibility: rows inserted before vs. after tombstone timestamp.
> - Tombstone splitting during tablet split.
> Integration tests:
> 1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
> 2. Insert many rows for a given prefix.
> 3. Issue delete-by-prefix with range tombstone.
> 4. Verify scan no longer returns deleted rows without individual row-level
> deletes being issued.
> 5. Insert new rows after tombstone timestamp — verify they are visible.
> 6. Trigger compaction — verify on-disk row count decreases.
> 7. Verify behavior across leader changes.
> Performance benchmark (optional):
> - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K,
> 100K, 1M matching rows.
> - Measure: wall time, WAL bytes written, I/O.
> Limitations and Future Work
> - CDC: range tombstones may need special handling in the Change Data Capture
> stream. Initially, they could be expanded into individual delete events at
> CDC read time.
> - Backup/Restore: backup tooling must preserve range tombstones.
> - Tombstone accumulation: without compaction, many small tombstones could
> degrade read performance. A background task to merge adjacent/overlapping
> tombstones may be needed.
> - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if
> needed.
> - Delete by PK subset: a natural extension would be to support deleting by
> an arbitrary subset of PK columns (e.g., col1 and col3 without specifying
> col2). This is feasible at the scan + delete level
> (Phases 1 and 2) since equality predicates can be applied on any column
> combination. However, it has significant implications:
> - Performance: prefix deletes map to a contiguous key range (efficient range
> scan). Subset deletes produce scattered matches across the key space,
> requiring broader scans.
> - Partition pruning: less effective when intermediate PK columns are
> unspecified.
> - Range tombstones (Phase 3): not directly applicable. A subset does not map
> to a single [lower, upper) key range, so the O(1) tombstone approach would
> not apply.
> Deferred as a separate feature once the prefix-based implementation is stable.
> *Summary*
> |Aspect|Phase 1|Phase 2|Phase 3|Future Work|
> |Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
> |Where|Client|Tablet Server|Storage engine|Client/Server|
> |Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded
> PK interval)|Scan + arbitrary predicates|
> |Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
> |WAL writes|N|N|1 per tablet|N|
> |Row enumeration|Yes|Yes|No|Yes|
> |Compaction|Standard|Standard|Drops covered rows|Standard|
> |Range tombstones|No|No|Yes|No|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)