Joaquín Antonio De Vicente López created KUDU-3748:
------------------------------------------------------

             Summary: Support Delete by Primary Key Prefix (Range Delete)
                 Key: KUDU-3748
                 URL: https://issues.apache.org/jira/browse/KUDU-3748
             Project: Kudu
          Issue Type: New Feature
            Reporter: Joaquín Antonio De Vicente López


|Aspect|Detail|
|Status|Proposed|
|Component|Client, Tablet Server, Storage|
|Affects|Java Client, C++ Tablet Server|

*Motivation*

Real-World Use Case: Spark Structured Streaming Ingestion with Calendar 
Explosion

We have an Apache Spark Structured Streaming job consuming messages from Apache 
Kafka. Each message contains 2 business identifier fields and 4 additional 
fields, each being a list of calendars
(valid_from, valid_to).

During transformation the calendar lists are vertically exploded, generating N 
rows per message where N = max(length(calendar_list_1 ... calendar_list_4)). 
Shorter lists produce NULL values in their
respective columns. A positional index column num_cal is generated and included 
as part of the Primary Key in the Kudu table — without it, only the last 
calendar row would survive an upsert.

The problem: we need to perform a logical upsert (replace) per message based on 
the 2 business identifiers. Since Kudu requires the full Primary Key for delete 
operations, and our PK includes num_cal,
we cannot directly delete all rows of a message unless we know every num_cal 
value. Attempting a delete without num_cal causes the streaming job to fail:

org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is 
not set
Caused by: java.lang.IllegalStateException: Primary key column num_cal is not 
set

Impact: the Structured Streaming job fails and aborts, message replacement 
cannot be performed, and the continuous ingestion pipeline is disrupted.

General Problem Statement

Kudu does not support deleting rows by a partial primary key (a leading prefix 
of the PK columns). Many workloads need a "delete-and-replace" pattern where 
all rows sharing a logical identifier are
removed before inserting new versions. Today this requires the caller to know 
every full PK value, which is impractical when the PK includes generated or 
positional columns.

*Proposed Solution:* Three-Phase Approach

The feature is designed in three phases, each building on the previous:
|Phase|Approach|Cost per Operation|
|Phase 1|Client-side scan + delete|O(N) network round-trips|
|Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads + 
writes (server-side)|
|Phase 3|Storage-level range tombstones|O(1) per tablet|

*Phase 1:* Client-Side Scan + Delete

Approach

The Java client performs the entire operation:

1. Validate that the set columns in the supplied PartialRow are a contiguous 
leading prefix of the table's primary key columns.
2. Open a scan with equality predicates on the prefix columns, projecting only 
PK columns.
3. For every row returned by the scan, issue a DELETE through a KuduSession 
(batched with MANUAL_FLUSH).
4. Return the total number of rows deleted.

Limitations
 - Network overhead: every matching row's PK is transferred from tablet server 
to client and back as a delete operation.
 - Latency: proportional to the number of matching rows.
 - No server-side optimization: the tablet server is unaware of the 
higher-level intent.

*Phase 2:* Server-Side RPC

Approach

A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The 
client sends one RPC per relevant tablet; the server performs the scan and 
delete internally, eliminating network round-trips
for individual rows.

*RPC Definition*

 
{code:java}
rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
returns (DeleteByPkPrefixResponsePB){
  option (kudu.rpc.track_rpc_result) = true;
}
{code}
 

 
{code:java}
message DeleteByPkPrefixRequestPB {  
  required bytes tablet_id = 1;
  // Schema of the prefix columns.
  optional SchemaPB schema = 2;
  // Encoded prefix row.
  optional RowOperationsPB pk_prefix = 3;
  optional ExternalConsistencyMode external_consistency_mode = 4
  [default = CLIENT_PROPAGATED];
  optional fixed64 propagated_timestamp = 5;
  optional security.SignedTokenPB authz_token = 6;
}
{code}
 

 
{code:java}
message DeleteByPkPrefixResponsePB {
  optional TabletServerErrorPB error = 1;
  optional int64 rows_deleted = 2;
  optional fixed64 timestamp = 3;
  optional ResourceMetricsPB resource_metrics = 4;
}{code}
 

Server-Side Behavior

1. Decode the prefix row from the request.
2. Build equality predicates for each prefix column.
3. Create an MVCC snapshot iterator projecting only PK columns.
4. Scan matching rows in batches and submit DELETE_IGNORE operations through 
Raft.
5. Return the total count of deleted rows.

Client-Side Behavior
 - Use partition pruning to determine which tablets may contain matching rows, 
avoiding RPCs to irrelevant tablets.
 - Send one DeleteByPkPrefixRequest per relevant tablet.
 - Sum rows_deleted from all responses.
 - If any tablet returns a feature-not-supported error (mixed-version cluster), 
fall back to Phase 1.

Public API

DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);

Semantics
 - Per-tablet snapshot consistency: rows to delete are determined by an MVCC 
snapshot taken at the start of the operation on each tablet. Rows inserted 
after the snapshot are NOT deleted.
 - Per-tablet non-atomicity: deletes are applied in batches through Raft. Each 
batch is individually atomic; the full operation within a tablet is NOT a 
single atomic transaction.
 - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
 - Partial failures: if one or more tablets fail, the client returns an error 
and the operation may have been partially applied on other tablets. Retrying is 
safe due to track_rpc_result=true.
 - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of 
the same rows are harmless.
 - Retry safety: track_rpc_result = true enables server-side deduplication.

Limitations of Phase 2
 - Still scans every matching row: the server must read each row's PK from 
storage and issue individual delete mutations through Raft.
 - Cost proportional to N (matching rows): for tables with millions of rows per 
prefix, this is expensive in CPU, I/O, and WAL.
 - WAL amplification: each deleted row generates a Raft log entry.

*Phase 3:* Storage-Level Range Tombstones

Motivation

Phases 1 and 2 solve the functional problem but remain O(N) in the number of 
matching rows: every row must be scanned and individually deleted. For tables 
with millions of rows per prefix, this is
expensive.

Range tombstones eliminate row enumeration at delete time, while maintaining 
O(log K) lookup cost during reads. A single metadata entry (lower_key, 
upper_key, mvcc_timestamp) logically deletes all rows in the key range without 
touching them individually.

Phase 3 reduces write amplification by replacing N row-level delete mutations 
with a single replicated range tombstone operation per tablet.

Range tombstones do not provide cross-tablet atomicity.
Atomic multi-tablet semantics require Kudu transactions.

Proposed Semantics

MVCC model:
 - A range tombstone is created at a single MVCC hybrid timestamp T.
 - Any row version with commit_timestamp <= T whose encoded primary key
falls within [lower_key, upper_key) is considered deleted for snapshots
at or after T.
 - Snapshots taken before T are not affected and continue to observe the row 
version.
 - Rows inserted after timestamp T are visible (not deleted), unless covered by 
a later tombstone.
 - This is consistent with Kudu's existing MVCC visibility rules.

Replication:
 - Range tombstone operations are Raft-replicated like regular writes.
 - Idempotent under retry via {{track_rpc_result=true}} server-side request 
deduplication.

Interaction with row-level operations:
 - A row-level insert or upsert at timestamp T2 > T produces a new version that 
is visible despite the tombstone, following standard MVCC visibility rules.
 - A row-level delete at any timestamp is independent; both can coexist.

Proposed Storage Design

New data structure: a range tombstone store — an interval set associated with 
each RowSet (or at the tablet level for MemRowSet).

Each entry:

{lower_key, upper_key, mvcc_timestamp}

Persistence options (to be evaluated during design):
|Option|Description|Pros|Cons|
|A|Separate tombstone file per RowSet|Clean separation|New file format|
|B|DeltaStore extension|Reuses infra|Poor key-fit|
|C|Tablet metadata|Simple|Metadata bloat|

Recommended approach: Option A — a separate tombstone store per RowSet, 
persisted as a small file with a sorted list of entries. An in-memory interval 
tree or sorted list enables efficient lookup.

Read Path Impact

When producing a row during a scan or point lookup:

1. Retrieve the row's encoded PK and commit timestamp.
2. Query the range tombstone store for any tombstone covering that PK.
3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and 
row_commit_time <= T, the row is filtered out.
4. Lookup must be O(log K) where K = number of tombstones, using binary search 
or an interval tree.

Compaction Impact

During rowset merge/compaction:
 - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are 
physically dropped.
 - Tombstones are retained as long as needed for MVCC snapshot reads, CDC 
consumers, and replication lag (following existing history retention rules / 
--tablet_history_max_age_sec). Tombstones must not be garbage-collected until 
no active snapshot can observe rows covered by the tombstone.
 - Overlapping tombstones with compatible timestamps can be merged into a 
single wider range.

Tablet Split Impact

When a tablet splits at a split key S:
 - Tombstone (lower, upper, T) is split into:
 - Child A: (lower, min(upper, S), T) — if lower < S
 - Child B: (max(lower, S), upper, T) — if upper > S
 - Empty ranges after clipping are discarded.

API / RPC Changes

Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone 
deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature 
flags.

Phase 2 and Phase 3 are conceptually distinct:
 * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level 
deletes.

 * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone 
over an encoded key interval.

The two approaches are independent and can coexist. Phase 3 does not modify the 
semantics of Phase 2.

 
{code:java}
rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
    returns (DeleteByKeyRangeResponsePB) {
  option (kudu.rpc.track_rpc_result) = true;
}

message DeleteByKeyRangeRequestPB {
  // Target tablet.
  required bytes tablet_id = 1;

  // Inclusive lower bound of the encoded primary key.
  required bytes lower_encoded_key = 2;

  // Exclusive upper bound of the encoded primary key.
  required bytes upper_encoded_key = 3;

  // Optional propagated timestamp for external consistency.
  optional fixed64 propagated_timestamp = 4;

  // Authorization token.
  optional security.SignedTokenPB authz_token = 5;
}

message DeleteByKeyRangeResponsePB {   
   optional TabletServerErrorPB error = 1;

   // Hybrid timestamp at which the tombstone was applied.
   optional fixed64 timestamp = 2;

   optional ResourceMetricsPB resource_metrics = 3; } {code}
 

 

 

Key Range Computation from PK Prefix

For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
 - lower_key = encode(prefix + MIN_SUFFIX)

 - upper_key = encode(prefix) with next lexicographical successor

This must correctly handle:
 - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
 - Composite PKs with mixed types.
 - Per-tablet partition boundaries: the effective range is the intersection of 
the prefix range and the tablet's key range.

Testing Plan

Unit tests:
 - Range tombstone containment logic (inclusive/exclusive boundaries, edge 
cases).
 - Overlapping tombstones with different timestamps.
 - MVCC visibility: rows inserted before vs. after tombstone timestamp.
 - Tombstone splitting during tablet split.

Integration tests:
1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
2. Insert many rows for a given prefix.
3. Issue delete-by-prefix with range tombstone.
4. Verify scan no longer returns deleted rows without individual row-level 
deletes being issued.
5. Insert new rows after tombstone timestamp — verify they are visible.
6. Trigger compaction — verify on-disk row count decreases.
7. Verify behavior across leader changes.

Performance benchmark (optional):
 - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K, 
100K, 1M matching rows.
 - Measure: wall time, WAL bytes written, I/O.

Limitations and Future Work
 - CDC: range tombstones may need special handling in the Change Data Capture 
stream. Initially, they could be expanded into individual delete events at CDC 
read time.
 - Backup/Restore: backup tooling must preserve range tombstones.
 - Tombstone accumulation: without compaction, many small tombstones could 
degrade read performance. A background task to merge adjacent/overlapping 
tombstones may be needed.
 - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if needed.
 - Delete by PK subset: a natural extension would be to support deleting by an 
arbitrary subset of PK columns (e.g., col1 and col3 without specifying col2). 
This is feasible at the scan + delete level
(Phases 1 and 2) since equality predicates can be applied on any column 
combination. However, it has significant implications:
 - Performance: prefix deletes map to a contiguous key range (efficient range 
scan). Subset deletes produce scattered matches across the key space, requiring 
broader scans.
 - Partition pruning: less effective when intermediate PK columns are 
unspecified.
 - Range tombstones (Phase 3): not directly applicable. A subset does not map 
to a single [lower, upper) key range, so the O(1) tombstone approach would not 
apply.

Deferred as a separate feature once the prefix-based implementation is stable.

*Summary*
|Aspect|Phase 1|Phase 2|Phase 3|Future Work|
|Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
|Where|Client|Tablet Server|Storage engine|Client/Server|
|Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded PK 
interval)|Scan + arbitrary predicates|
|Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
|WAL writes|N|N|1 per tablet|N|
|Row enumeration|Yes|Yes|No|Yes|
|Compaction|Standard|Standard|Drops covered rows|Standard|
|Range tombstones|No|No|Yes|No|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to