Joaquín Antonio De Vicente López created KUDU-3748:
------------------------------------------------------
Summary: Support Delete by Primary Key Prefix (Range Delete)
Key: KUDU-3748
URL: https://issues.apache.org/jira/browse/KUDU-3748
Project: Kudu
Issue Type: New Feature
Reporter: Joaquín Antonio De Vicente López
|Aspect|Detail|
|Status|Proposed|
|Component|Client, Tablet Server, Storage|
|Affects|Java Client, C++ Tablet Server|
*Motivation*
Real-World Use Case: Spark Structured Streaming Ingestion with Calendar
Explosion
We have an Apache Spark Structured Streaming job consuming messages from Apache
Kafka. Each message contains 2 business identifier fields and 4 additional
fields, each being a list of calendars
(valid_from, valid_to).
During transformation the calendar lists are vertically exploded, generating N
rows per message where N = max(length(calendar_list_1 ... calendar_list_4)).
Shorter lists produce NULL values in their
respective columns. A positional index column num_cal is generated and included
as part of the Primary Key in the Kudu table — without it, only the last
calendar row would survive an upsert.
The problem: we need to perform a logical upsert (replace) per message based on
the 2 business identifiers. Since Kudu requires the full Primary Key for delete
operations, and our PK includes num_cal,
we cannot directly delete all rows of a message unless we know every num_cal
value. Attempting a delete without num_cal causes the streaming job to fail:
org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is
not set
Caused by: java.lang.IllegalStateException: Primary key column num_cal is not
set
Impact: the Structured Streaming job fails and aborts, message replacement
cannot be performed, and the continuous ingestion pipeline is disrupted.
General Problem Statement
Kudu does not support deleting rows by a partial primary key (a leading prefix
of the PK columns). Many workloads need a "delete-and-replace" pattern where
all rows sharing a logical identifier are
removed before inserting new versions. Today this requires the caller to know
every full PK value, which is impractical when the PK includes generated or
positional columns.
*Proposed Solution:* Three-Phase Approach
The feature is designed in three phases, each building on the previous:
|Phase|Approach|Cost per Operation|
|Phase 1|Client-side scan + delete|O(N) network round-trips|
|Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads +
writes (server-side)|
|Phase 3|Storage-level range tombstones|O(1) per tablet|
*Phase 1:* Client-Side Scan + Delete
Approach
The Java client performs the entire operation:
1. Validate that the set columns in the supplied PartialRow are a contiguous
leading prefix of the table's primary key columns.
2. Open a scan with equality predicates on the prefix columns, projecting only
PK columns.
3. For every row returned by the scan, issue a DELETE through a KuduSession
(batched with MANUAL_FLUSH).
4. Return the total number of rows deleted.
Limitations
- Network overhead: every matching row's PK is transferred from tablet server
to client and back as a delete operation.
- Latency: proportional to the number of matching rows.
- No server-side optimization: the tablet server is unaware of the
higher-level intent.
*Phase 2:* Server-Side RPC
Approach
A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The
client sends one RPC per relevant tablet; the server performs the scan and
delete internally, eliminating network round-trips
for individual rows.
*RPC Definition*
{code:java}
rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
returns (DeleteByPkPrefixResponsePB){
option (kudu.rpc.track_rpc_result) = true;
}
{code}
{code:java}
message DeleteByPkPrefixRequestPB {
required bytes tablet_id = 1;
// Schema of the prefix columns.
optional SchemaPB schema = 2;
// Encoded prefix row.
optional RowOperationsPB pk_prefix = 3;
optional ExternalConsistencyMode external_consistency_mode = 4
[default = CLIENT_PROPAGATED];
optional fixed64 propagated_timestamp = 5;
optional security.SignedTokenPB authz_token = 6;
}
{code}
{code:java}
message DeleteByPkPrefixResponsePB {
optional TabletServerErrorPB error = 1;
optional int64 rows_deleted = 2;
optional fixed64 timestamp = 3;
optional ResourceMetricsPB resource_metrics = 4;
}{code}
Server-Side Behavior
1. Decode the prefix row from the request.
2. Build equality predicates for each prefix column.
3. Create an MVCC snapshot iterator projecting only PK columns.
4. Scan matching rows in batches and submit DELETE_IGNORE operations through
Raft.
5. Return the total count of deleted rows.
Client-Side Behavior
- Use partition pruning to determine which tablets may contain matching rows,
avoiding RPCs to irrelevant tablets.
- Send one DeleteByPkPrefixRequest per relevant tablet.
- Sum rows_deleted from all responses.
- If any tablet returns a feature-not-supported error (mixed-version cluster),
fall back to Phase 1.
Public API
DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);
Semantics
- Per-tablet snapshot consistency: rows to delete are determined by an MVCC
snapshot taken at the start of the operation on each tablet. Rows inserted
after the snapshot are NOT deleted.
- Per-tablet non-atomicity: deletes are applied in batches through Raft. Each
batch is individually atomic; the full operation within a tablet is NOT a
single atomic transaction.
- Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
- Partial failures: if one or more tablets fail, the client returns an error
and the operation may have been partially applied on other tablets. Retrying is
safe due to track_rpc_result=true.
- Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of
the same rows are harmless.
- Retry safety: track_rpc_result = true enables server-side deduplication.
Limitations of Phase 2
- Still scans every matching row: the server must read each row's PK from
storage and issue individual delete mutations through Raft.
- Cost proportional to N (matching rows): for tables with millions of rows per
prefix, this is expensive in CPU, I/O, and WAL.
- WAL amplification: each deleted row generates a Raft log entry.
*Phase 3:* Storage-Level Range Tombstones
Motivation
Phases 1 and 2 solve the functional problem but remain O(N) in the number of
matching rows: every row must be scanned and individually deleted. For tables
with millions of rows per prefix, this is
expensive.
Range tombstones eliminate row enumeration at delete time, while maintaining
O(log K) lookup cost during reads. A single metadata entry (lower_key,
upper_key, mvcc_timestamp) logically deletes all rows in the key range without
touching them individually.
Phase 3 reduces write amplification by replacing N row-level delete mutations
with a single replicated range tombstone operation per tablet.
Range tombstones do not provide cross-tablet atomicity.
Atomic multi-tablet semantics require Kudu transactions.
Proposed Semantics
MVCC model:
- A range tombstone is created at a single MVCC hybrid timestamp T.
- Any row version with commit_timestamp <= T whose encoded primary key
falls within [lower_key, upper_key) is considered deleted for snapshots
at or after T.
- Snapshots taken before T are not affected and continue to observe the row
version.
- Rows inserted after timestamp T are visible (not deleted), unless covered by
a later tombstone.
- This is consistent with Kudu's existing MVCC visibility rules.
Replication:
- Range tombstone operations are Raft-replicated like regular writes.
- Idempotent under retry via {{track_rpc_result=true}} server-side request
deduplication.
Interaction with row-level operations:
- A row-level insert or upsert at timestamp T2 > T produces a new version that
is visible despite the tombstone, following standard MVCC visibility rules.
- A row-level delete at any timestamp is independent; both can coexist.
Proposed Storage Design
New data structure: a range tombstone store — an interval set associated with
each RowSet (or at the tablet level for MemRowSet).
Each entry:
{lower_key, upper_key, mvcc_timestamp}
Persistence options (to be evaluated during design):
|Option|Description|Pros|Cons|
|A|Separate tombstone file per RowSet|Clean separation|New file format|
|B|DeltaStore extension|Reuses infra|Poor key-fit|
|C|Tablet metadata|Simple|Metadata bloat|
Recommended approach: Option A — a separate tombstone store per RowSet,
persisted as a small file with a sorted list of entries. An in-memory interval
tree or sorted list enables efficient lookup.
Read Path Impact
When producing a row during a scan or point lookup:
1. Retrieve the row's encoded PK and commit timestamp.
2. Query the range tombstone store for any tombstone covering that PK.
3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and
row_commit_time <= T, the row is filtered out.
4. Lookup must be O(log K) where K = number of tombstones, using binary search
or an interval tree.
Compaction Impact
During rowset merge/compaction:
- Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are
physically dropped.
- Tombstones are retained as long as needed for MVCC snapshot reads, CDC
consumers, and replication lag (following existing history retention rules /
--tablet_history_max_age_sec). Tombstones must not be garbage-collected until
no active snapshot can observe rows covered by the tombstone.
- Overlapping tombstones with compatible timestamps can be merged into a
single wider range.
Tablet Split Impact
When a tablet splits at a split key S:
- Tombstone (lower, upper, T) is split into:
- Child A: (lower, min(upper, S), T) — if lower < S
- Child B: (max(lower, S), upper, T) — if upper > S
- Empty ranges after clipping are discarded.
API / RPC Changes
Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone
deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature
flags.
Phase 2 and Phase 3 are conceptually distinct:
* *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level
deletes.
* *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone
over an encoded key interval.
The two approaches are independent and can coexist. Phase 3 does not modify the
semantics of Phase 2.
{code:java}
rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
returns (DeleteByKeyRangeResponsePB) {
option (kudu.rpc.track_rpc_result) = true;
}
message DeleteByKeyRangeRequestPB {
// Target tablet.
required bytes tablet_id = 1;
// Inclusive lower bound of the encoded primary key.
required bytes lower_encoded_key = 2;
// Exclusive upper bound of the encoded primary key.
required bytes upper_encoded_key = 3;
// Optional propagated timestamp for external consistency.
optional fixed64 propagated_timestamp = 4;
// Authorization token.
optional security.SignedTokenPB authz_token = 5;
}
message DeleteByKeyRangeResponsePB {
optional TabletServerErrorPB error = 1;
// Hybrid timestamp at which the tombstone was applied.
optional fixed64 timestamp = 2;
optional ResourceMetricsPB resource_metrics = 3; } {code}
Key Range Computation from PK Prefix
For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
- lower_key = encode(prefix + MIN_SUFFIX)
- upper_key = encode(prefix) with next lexicographical successor
This must correctly handle:
- Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
- Composite PKs with mixed types.
- Per-tablet partition boundaries: the effective range is the intersection of
the prefix range and the tablet's key range.
Testing Plan
Unit tests:
- Range tombstone containment logic (inclusive/exclusive boundaries, edge
cases).
- Overlapping tombstones with different timestamps.
- MVCC visibility: rows inserted before vs. after tombstone timestamp.
- Tombstone splitting during tablet split.
Integration tests:
1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
2. Insert many rows for a given prefix.
3. Issue delete-by-prefix with range tombstone.
4. Verify scan no longer returns deleted rows without individual row-level
deletes being issued.
5. Insert new rows after tombstone timestamp — verify they are visible.
6. Trigger compaction — verify on-disk row count decreases.
7. Verify behavior across leader changes.
Performance benchmark (optional):
- Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K,
100K, 1M matching rows.
- Measure: wall time, WAL bytes written, I/O.
Limitations and Future Work
- CDC: range tombstones may need special handling in the Change Data Capture
stream. Initially, they could be expanded into individual delete events at CDC
read time.
- Backup/Restore: backup tooling must preserve range tombstones.
- Tombstone accumulation: without compaction, many small tombstones could
degrade read performance. A background task to merge adjacent/overlapping
tombstones may be needed.
- Cross-tablet atomicity: remains unsupported. Use Kudu transactions if needed.
- Delete by PK subset: a natural extension would be to support deleting by an
arbitrary subset of PK columns (e.g., col1 and col3 without specifying col2).
This is feasible at the scan + delete level
(Phases 1 and 2) since equality predicates can be applied on any column
combination. However, it has significant implications:
- Performance: prefix deletes map to a contiguous key range (efficient range
scan). Subset deletes produce scattered matches across the key space, requiring
broader scans.
- Partition pruning: less effective when intermediate PK columns are
unspecified.
- Range tombstones (Phase 3): not directly applicable. A subset does not map
to a single [lower, upper) key range, so the O(1) tombstone approach would not
apply.
Deferred as a separate feature once the prefix-based implementation is stable.
*Summary*
|Aspect|Phase 1|Phase 2|Phase 3|Future Work|
|Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
|Where|Client|Tablet Server|Storage engine|Client/Server|
|Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded PK
interval)|Scan + arbitrary predicates|
|Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
|WAL writes|N|N|1 per tablet|N|
|Row enumeration|Yes|Yes|No|Yes|
|Compaction|Standard|Standard|Drops covered rows|Standard|
|Range tombstones|No|No|Yes|No|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)