[ 
https://issues.apache.org/jira/browse/KUDU-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joaquín Antonio De Vicente López updated KUDU-3748:
---------------------------------------------------
    Description: 
|Aspect|Detail|
|Status|Proposed|
|Component|Client, Tablet Server, Storage|
|Affects|Java Client, C++ Tablet Server|

*Motivation*

Real-World Use Case: Spark Structured Streaming Ingestion with Calendar 
Explosion

We have an Apache Spark Structured Streaming job consuming messages from Apache 
Kafka. Each message contains 2 business identifier fields and 4 additional 
fields, each being a list of calendars
(valid_from, valid_to).

During transformation the calendar lists are vertically exploded, generating N 
rows per message where N = max(length(calendar_list_1 ... calendar_list_4)). 
Shorter lists produce NULL values in their
respective columns. A positional index column num_cal is generated and included 
as part of the Primary Key in the Kudu table — without it, only the last 
calendar row would survive an upsert.

The problem: we need to perform a logical upsert (replace) per message based on 
the 2 business identifiers. Since Kudu requires the full Primary Key for delete 
operations, and our PK includes num_cal,
we cannot directly delete all rows of a message unless we know every num_cal 
value. Attempting a delete without num_cal causes the streaming job to fail:

org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is 
not set
Caused by: java.lang.IllegalStateException: Primary key column num_cal is not 
set

Impact: the Structured Streaming job fails and aborts, message replacement 
cannot be performed, and the continuous ingestion pipeline is disrupted.

General Problem Statement

Kudu does not support deleting rows by a partial primary key (a leading prefix 
of the PK columns). Many workloads need a "delete-and-replace" pattern where 
all rows sharing a logical identifier are
removed before inserting new versions. Today this requires the caller to know 
every full PK value, which is impractical when the PK includes generated or 
positional columns.

*Proposed Solution:* Three-Phase Approach

The feature is designed in three phases, each building on the previous:
|Phase|Approach|Cost per Operation|
|Phase 1|Client-side scan + delete|O(N) network round-trips|
|Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads + 
writes (server-side)|
|Phase 3|Storage-level range tombstones|O(1) per tablet|

*Phase 1:* Client-Side Scan + Delete

Approach

The Java client performs the entire operation:

1. Validate that the set columns in the supplied PartialRow are a contiguous 
leading prefix of the table's primary key columns.
2. Open a scan with equality predicates on the prefix columns, projecting only 
PK columns.
3. For every row returned by the scan, issue a DELETE through a KuduSession 
(batched with MANUAL_FLUSH).
4. Return the total number of rows deleted.

Limitations
 - Network overhead: every matching row's PK is transferred from tablet server 
to client and back as a delete operation.
 - Latency: proportional to the number of matching rows.
 - No server-side optimization: the tablet server is unaware of the 
higher-level intent.

*Phase 2:* Server-Side RPC

Approach

A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The 
client sends one RPC per relevant tablet; the server performs the scan and 
delete internally, eliminating network round-trips
for individual rows.

*RPC Definition*
{code:java}
rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
returns (DeleteByPkPrefixResponsePB){
  option (kudu.rpc.track_rpc_result) = true;
}
{code}
{code:java}
message DeleteByPkPrefixRequestPB {  
  required bytes tablet_id = 1;
  // Schema of the prefix columns.
  optional SchemaPB schema = 2;
  // Encoded prefix row.
  optional RowOperationsPB pk_prefix = 3;
  optional ExternalConsistencyMode external_consistency_mode = 4
  [default = CLIENT_PROPAGATED];
  optional fixed64 propagated_timestamp = 5;
  optional security.SignedTokenPB authz_token = 6;
}
{code}
{code:java}
message DeleteByPkPrefixResponsePB {
  optional TabletServerErrorPB error = 1;
  optional int64 rows_deleted = 2;
  optional fixed64 timestamp = 3;
  optional ResourceMetricsPB resource_metrics = 4;
}{code}
Server-Side Behavior

1. Decode the prefix row from the request.
2. Build equality predicates for each prefix column.
3. Create an MVCC snapshot iterator projecting only PK columns.
4. Scan matching rows in batches and submit DELETE_IGNORE operations through 
Raft.
5. Return the total count of deleted rows.

Client-Side Behavior
 - Use partition pruning to determine which tablets may contain matching rows, 
avoiding RPCs to irrelevant tablets.
 - Send one DeleteByPkPrefixRequest per relevant tablet.
 - Sum rows_deleted from all responses.
 - If any tablet returns a feature-not-supported error (mixed-version cluster), 
fall back to Phase 1.

Public API

DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);

Semantics
 - Per-tablet snapshot consistency: rows to delete are determined by an MVCC 
snapshot taken at the start of the operation on each tablet. Rows inserted 
after the snapshot are NOT deleted.
 - Per-tablet non-atomicity: deletes are applied in batches through Raft. Each 
batch is individually atomic; the full operation within a tablet is NOT a 
single atomic transaction.
 - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
 - Partial failures: if one or more tablets fail, the client returns an error 
and the operation may have been partially applied on other tablets. Retrying is 
safe due to track_rpc_result=true.
 - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of 
the same rows are harmless.
 - Retry safety: track_rpc_result = true enables server-side deduplication.

Limitations of Phase 2
 - Still scans every matching row: the server must read each row's PK from 
storage and issue individual delete mutations through Raft.
 - Cost proportional to N (matching rows): for tables with millions of rows per 
prefix, this is expensive in CPU, I/O, and WAL.
 - WAL amplification: each deleted row generates a Raft log entry.

*Phase 3:* Storage-Level Range Tombstones

Motivation

Phases 1 and 2 solve the functional problem but remain O(N) in the number of 
matching rows: every row must be scanned and individually deleted. For tables 
with millions of rows per prefix, this is
expensive.

Range tombstones eliminate row enumeration at delete time, while maintaining 
O(log K) lookup cost during reads. A single metadata entry (lower_key, 
upper_key, mvcc_timestamp) logically deletes all rows in the key range without 
touching them individually.

Phase 3 reduces write amplification by replacing N row-level delete mutations 
with a single replicated range tombstone operation per tablet.

Range tombstones do not provide cross-tablet atomicity.
Atomic multi-tablet semantics require Kudu transactions.

Proposed Semantics

MVCC model:
 - A range tombstone is created at a single MVCC hybrid timestamp T.
 - Any row version with commit_timestamp <= T whose encoded primary key
falls within [lower_key, upper_key) is considered deleted for snapshots
at or after T.
 - Snapshots taken before T are not affected and continue to observe the row 
version.
 - Rows inserted after timestamp T are visible (not deleted), unless covered by 
a later tombstone.
 - This is consistent with Kudu's existing MVCC visibility rules.

Replication:
 - Range tombstone operations are Raft-replicated like regular writes.
 - Idempotent under retry via {{track_rpc_result=true}} server-side request 
deduplication.

Interaction with row-level operations:
 - A row-level insert or upsert at timestamp T2 > T produces a new version that 
is visible despite the tombstone, following standard MVCC visibility rules.
 - A row-level delete at any timestamp is independent; both can coexist.

Proposed Storage Design

New data structure: a range tombstone store — an interval set associated with 
each RowSet (or at the tablet level for MemRowSet).

Each entry:

{lower_key, upper_key, mvcc_timestamp}

Persistence options (to be evaluated during design):
|Option|Description|Pros|Cons|
|A|Separate tombstone file per RowSet|Clean separation|New file format|
|B|DeltaStore extension|Reuses infra|Poor key-fit|
|C|Tablet metadata|Simple|Metadata bloat|

Recommended approach: Option A — a separate tombstone store per RowSet, 
persisted as a small file with a sorted list of entries. An in-memory interval 
tree or sorted list enables efficient lookup.

Read Path Impact

When producing a row during a scan or point lookup:

1. Retrieve the row's encoded PK and commit timestamp.
2. Query the range tombstone store for any tombstone covering that PK.
3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and 
row_commit_time <= T, the row is filtered out.
4. Lookup must be O(log K) where K = number of tombstones, using binary search 
or an interval tree.

Compaction Impact

During rowset merge/compaction:
 - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are 
physically dropped.
 - Tombstones are retained as long as needed for MVCC snapshot reads, CDC 
consumers, and replication lag (following existing history retention rules / 
--tablet_history_max_age_sec). Tombstones must not be garbage-collected until 
no active snapshot can observe rows covered by the tombstone.
 - Overlapping tombstones with compatible timestamps can be merged into a 
single wider range.

Tablet Split Impact

When a tablet splits at a split key S:
 - Tombstone (lower, upper, T) is split into:
 - Child A: (lower, min(upper, S), T) — if lower < S
 - Child B: (max(lower, S), upper, T) — if upper > S
 - Empty ranges after clipping are discarded.

API / RPC Changes

Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone 
deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature 
flags.

Phase 2 and Phase 3 are conceptually distinct:
 * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level 
deletes.

 * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone 
over an encoded key interval.

The two approaches are independent and can coexist. Phase 3 does not modify the 
semantics of Phase 2.
{code:java}
rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
    returns (DeleteByKeyRangeResponsePB) {
  option (kudu.rpc.track_rpc_result) = true;
}

message DeleteByKeyRangeRequestPB {
  // Target tablet.
  required bytes tablet_id = 1;

  // Inclusive lower bound of the encoded primary key.
  required bytes lower_encoded_key = 2;

  // Exclusive upper bound of the encoded primary key.
  required bytes upper_encoded_key = 3;

  // Optional propagated timestamp for external consistency.
  optional fixed64 propagated_timestamp = 4;

  // Authorization token.
  optional security.SignedTokenPB authz_token = 5;
}

message DeleteByKeyRangeResponsePB {   
   optional TabletServerErrorPB error = 1;

   // Hybrid timestamp at which the tombstone was applied.
   optional fixed64 timestamp = 2;

   optional ResourceMetricsPB resource_metrics = 3; } {code}
Key Range Computation from PK Prefix

For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
 - lower_key = encode(prefix + MIN_SUFFIX)

 - upper_key = encode(prefix) with next lexicographical successor

This must correctly handle:
 - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
 - Composite PKs with mixed types.
 - Per-tablet partition boundaries: the effective range is the intersection of 
the prefix range and the tablet's key range.

Testing Plan

Unit tests:
 - Range tombstone containment logic (inclusive/exclusive boundaries, edge 
cases).
 - Overlapping tombstones with different timestamps.
 - MVCC visibility: rows inserted before vs. after tombstone timestamp.
 - Tombstone splitting during tablet split.

Integration tests:
1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
2. Insert many rows for a given prefix.
3. Issue delete-by-prefix with range tombstone.
4. Verify scan no longer returns deleted rows without individual row-level 
deletes being issued.
5. Insert new rows after tombstone timestamp — verify they are visible.
6. Trigger compaction — verify on-disk row count decreases.
7. Verify behavior across leader changes.

Performance benchmark (optional):
 - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K, 
100K, 1M matching rows.
 - Measure: wall time, WAL bytes written, I/O.

Limitations and Future Work
 - CDC: range tombstones may need special handling in the Change Data Capture 
stream. Initially, they could be expanded into individual delete events at CDC 
read time.
 - Backup/Restore: backup tooling must preserve range tombstones.
 - Tombstone accumulation: without compaction, many small tombstones could 
degrade read performance. A background task to merge adjacent/overlapping 
tombstones may be needed.
 - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if needed.
 - Delete by PK subset: a natural extension would be to support deleting by an 
arbitrary subset of PK columns (e.g., col1 and col3 without specifying col2). 
This is feasible at the scan + delete level
(Phases 1 and 2) since equality predicates can be applied on any column 
combination. However, it has significant implications:
 - Performance: prefix deletes map to a contiguous key range (efficient range 
scan). Subset deletes produce scattered matches across the key space, requiring 
broader scans.
 - Partition pruning: less effective when intermediate PK columns are 
unspecified.
 - Range tombstones (Phase 3): not directly applicable. A subset does not map 
to a single [lower, upper) key range, so the O(1) tombstone approach would not 
apply.

Deferred as a separate feature once the prefix-based implementation is stable.

*Summary*
|Aspect|Phase 1|Phase 2|Phase 3|Future Work|
|Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
|Where|Client|Tablet Server|Storage engine|Client/Server|
|Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded PK 
interval)|Scan + arbitrary predicates|
|Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
|WAL writes|N|N|1 per tablet|N|
|Row enumeration|Yes|Yes|No|Yes|
|Compaction|Standard|Standard|Drops covered rows|Standard|
|Range tombstones|No|No|Yes|No|

  was:
|Aspect|Detail|
|Status|Proposed|
|Component|Client, Tablet Server, Storage|
|Affects|Java Client, C++ Tablet Server|

*Motivation*

Real-World Use Case: Spark Structured Streaming Ingestion with Calendar 
Explosion

We have an Apache Spark Structured Streaming job consuming messages from Apache 
Kafka. Each message contains 2 business identifier fields and 4 additional 
fields, each being a list of calendars
(valid_from, valid_to).

During transformation the calendar lists are vertically exploded, generating N 
rows per message where N = max(length(calendar_list_1 ... calendar_list_4)). 
Shorter lists produce NULL values in their
respective columns. A positional index column num_cal is generated and included 
as part of the Primary Key in the Kudu table — without it, only the last 
calendar row would survive an upsert.

The problem: we need to perform a logical upsert (replace) per message based on 
the 2 business identifiers. Since Kudu requires the full Primary Key for delete 
operations, and our PK includes num_cal,
we cannot directly delete all rows of a message unless we know every num_cal 
value. Attempting a delete without num_cal causes the streaming job to fail:

org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is 
not set
Caused by: java.lang.IllegalStateException: Primary key column num_cal is not 
set

Impact: the Structured Streaming job fails and aborts, message replacement 
cannot be performed, and the continuous ingestion pipeline is disrupted.

General Problem Statement

Kudu does not support deleting rows by a partial primary key (a leading prefix 
of the PK columns). Many workloads need a "delete-and-replace" pattern where 
all rows sharing a logical identifier are
removed before inserting new versions. Today this requires the caller to know 
every full PK value, which is impractical when the PK includes generated or 
positional columns.

*Proposed Solution:* Three-Phase Approach

The feature is designed in three phases, each building on the previous:
|Phase|Approach|Cost per Operation|
|Phase 1|Client-side scan + delete|O(N) network round-trips|
|Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads + 
writes (server-side)|
|Phase 3|Storage-level range tombstones|O(1) per tablet|

*Phase 1:* Client-Side Scan + Delete

Approach

The Java client performs the entire operation:

1. Validate that the set columns in the supplied PartialRow are a contiguous 
leading prefix of the table's primary key columns.
2. Open a scan with equality predicates on the prefix columns, projecting only 
PK columns.
3. For every row returned by the scan, issue a DELETE through a KuduSession 
(batched with MANUAL_FLUSH).
4. Return the total number of rows deleted.

Limitations
 - Network overhead: every matching row's PK is transferred from tablet server 
to client and back as a delete operation.
 - Latency: proportional to the number of matching rows.
 - No server-side optimization: the tablet server is unaware of the 
higher-level intent.

*Phase 2:* Server-Side RPC

Approach

A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The 
client sends one RPC per relevant tablet; the server performs the scan and 
delete internally, eliminating network round-trips
for individual rows.

*RPC Definition*

 
{code:java}
rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
returns (DeleteByPkPrefixResponsePB){
  option (kudu.rpc.track_rpc_result) = true;
}
{code}
 

 
{code:java}
message DeleteByPkPrefixRequestPB {  
  required bytes tablet_id = 1;
  // Schema of the prefix columns.
  optional SchemaPB schema = 2;
  // Encoded prefix row.
  optional RowOperationsPB pk_prefix = 3;
  optional ExternalConsistencyMode external_consistency_mode = 4
  [default = CLIENT_PROPAGATED];
  optional fixed64 propagated_timestamp = 5;
  optional security.SignedTokenPB authz_token = 6;
}
{code}
 

 
{code:java}
message DeleteByPkPrefixResponsePB {
  optional TabletServerErrorPB error = 1;
  optional int64 rows_deleted = 2;
  optional fixed64 timestamp = 3;
  optional ResourceMetricsPB resource_metrics = 4;
}{code}
 

Server-Side Behavior

1. Decode the prefix row from the request.
2. Build equality predicates for each prefix column.
3. Create an MVCC snapshot iterator projecting only PK columns.
4. Scan matching rows in batches and submit DELETE_IGNORE operations through 
Raft.
5. Return the total count of deleted rows.

Client-Side Behavior
 - Use partition pruning to determine which tablets may contain matching rows, 
avoiding RPCs to irrelevant tablets.
 - Send one DeleteByPkPrefixRequest per relevant tablet.
 - Sum rows_deleted from all responses.
 - If any tablet returns a feature-not-supported error (mixed-version cluster), 
fall back to Phase 1.

Public API

DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);

Semantics
 - Per-tablet snapshot consistency: rows to delete are determined by an MVCC 
snapshot taken at the start of the operation on each tablet. Rows inserted 
after the snapshot are NOT deleted.
 - Per-tablet non-atomicity: deletes are applied in batches through Raft. Each 
batch is individually atomic; the full operation within a tablet is NOT a 
single atomic transaction.
 - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
 - Partial failures: if one or more tablets fail, the client returns an error 
and the operation may have been partially applied on other tablets. Retrying is 
safe due to track_rpc_result=true.
 - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of 
the same rows are harmless.
 - Retry safety: track_rpc_result = true enables server-side deduplication.

Limitations of Phase 2
 - Still scans every matching row: the server must read each row's PK from 
storage and issue individual delete mutations through Raft.
 - Cost proportional to N (matching rows): for tables with millions of rows per 
prefix, this is expensive in CPU, I/O, and WAL.
 - WAL amplification: each deleted row generates a Raft log entry.

*Phase 3:* Storage-Level Range Tombstones

Motivation

Phases 1 and 2 solve the functional problem but remain O(N) in the number of 
matching rows: every row must be scanned and individually deleted. For tables 
with millions of rows per prefix, this is
expensive.

Range tombstones eliminate row enumeration at delete time, while maintaining 
O(log K) lookup cost during reads. A single metadata entry (lower_key, 
upper_key, mvcc_timestamp) logically deletes all rows in the key range without 
touching them individually.

Phase 3 reduces write amplification by replacing N row-level delete mutations 
with a single replicated range tombstone operation per tablet.

Range tombstones do not provide cross-tablet atomicity.
Atomic multi-tablet semantics require Kudu transactions.

Proposed Semantics

MVCC model:
 - A range tombstone is created at a single MVCC hybrid timestamp T.
 - Any row version with commit_timestamp <= T whose encoded primary key
falls within [lower_key, upper_key) is considered deleted for snapshots
at or after T.
 - Snapshots taken before T are not affected and continue to observe the row 
version.
 - Rows inserted after timestamp T are visible (not deleted), unless covered by 
a later tombstone.
 - This is consistent with Kudu's existing MVCC visibility rules.

Replication:
 - Range tombstone operations are Raft-replicated like regular writes.
 - Idempotent under retry via {{track_rpc_result=true}} server-side request 
deduplication.

Interaction with row-level operations:
 - A row-level insert or upsert at timestamp T2 > T produces a new version that 
is visible despite the tombstone, following standard MVCC visibility rules.
 - A row-level delete at any timestamp is independent; both can coexist.

Proposed Storage Design

New data structure: a range tombstone store — an interval set associated with 
each RowSet (or at the tablet level for MemRowSet).

Each entry:

{lower_key, upper_key, mvcc_timestamp}

Persistence options (to be evaluated during design):
|Option|Description|Pros|Cons|
|A|Separate tombstone file per RowSet|Clean separation|New file format|
|B|DeltaStore extension|Reuses infra|Poor key-fit|
|C|Tablet metadata|Simple|Metadata bloat|

Recommended approach: Option A — a separate tombstone store per RowSet, 
persisted as a small file with a sorted list of entries. An in-memory interval 
tree or sorted list enables efficient lookup.

Read Path Impact

When producing a row during a scan or point lookup:

1. Retrieve the row's encoded PK and commit timestamp.
2. Query the range tombstone store for any tombstone covering that PK.
3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and 
row_commit_time <= T, the row is filtered out.
4. Lookup must be O(log K) where K = number of tombstones, using binary search 
or an interval tree.

Compaction Impact

During rowset merge/compaction:
 - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are 
physically dropped.
 - Tombstones are retained as long as needed for MVCC snapshot reads, CDC 
consumers, and replication lag (following existing history retention rules / 
--tablet_history_max_age_sec). Tombstones must not be garbage-collected until 
no active snapshot can observe rows covered by the tombstone.
 - Overlapping tombstones with compatible timestamps can be merged into a 
single wider range.

Tablet Split Impact

When a tablet splits at a split key S:
 - Tombstone (lower, upper, T) is split into:
 - Child A: (lower, min(upper, S), T) — if lower < S
 - Child B: (max(lower, S), upper, T) — if upper > S
 - Empty ranges after clipping are discarded.

API / RPC Changes

Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone 
deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature 
flags.

Phase 2 and Phase 3 are conceptually distinct:
 * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level 
deletes.

 * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone 
over an encoded key interval.

The two approaches are independent and can coexist. Phase 3 does not modify the 
semantics of Phase 2.

 
{code:java}
rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
    returns (DeleteByKeyRangeResponsePB) {
  option (kudu.rpc.track_rpc_result) = true;
}

message DeleteByKeyRangeRequestPB {
  // Target tablet.
  required bytes tablet_id = 1;

  // Inclusive lower bound of the encoded primary key.
  required bytes lower_encoded_key = 2;

  // Exclusive upper bound of the encoded primary key.
  required bytes upper_encoded_key = 3;

  // Optional propagated timestamp for external consistency.
  optional fixed64 propagated_timestamp = 4;

  // Authorization token.
  optional security.SignedTokenPB authz_token = 5;
}

message DeleteByKeyRangeResponsePB {   
   optional TabletServerErrorPB error = 1;

   // Hybrid timestamp at which the tombstone was applied.
   optional fixed64 timestamp = 2;

   optional ResourceMetricsPB resource_metrics = 3; } {code}
 

 

 

Key Range Computation from PK Prefix

For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
 - lower_key = encode(prefix + MIN_SUFFIX)

 - upper_key = encode(prefix) with next lexicographical successor

This must correctly handle:
 - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
 - Composite PKs with mixed types.
 - Per-tablet partition boundaries: the effective range is the intersection of 
the prefix range and the tablet's key range.

Testing Plan

Unit tests:
 - Range tombstone containment logic (inclusive/exclusive boundaries, edge 
cases).
 - Overlapping tombstones with different timestamps.
 - MVCC visibility: rows inserted before vs. after tombstone timestamp.
 - Tombstone splitting during tablet split.

Integration tests:
1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
2. Insert many rows for a given prefix.
3. Issue delete-by-prefix with range tombstone.
4. Verify scan no longer returns deleted rows without individual row-level 
deletes being issued.
5. Insert new rows after tombstone timestamp — verify they are visible.
6. Trigger compaction — verify on-disk row count decreases.
7. Verify behavior across leader changes.

Performance benchmark (optional):
 - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K, 
100K, 1M matching rows.
 - Measure: wall time, WAL bytes written, I/O.

Limitations and Future Work
 - CDC: range tombstones may need special handling in the Change Data Capture 
stream. Initially, they could be expanded into individual delete events at CDC 
read time.
 - Backup/Restore: backup tooling must preserve range tombstones.
 - Tombstone accumulation: without compaction, many small tombstones could 
degrade read performance. A background task to merge adjacent/overlapping 
tombstones may be needed.
 - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if needed.
 - Delete by PK subset: a natural extension would be to support deleting by an 
arbitrary subset of PK columns (e.g., col1 and col3 without specifying col2). 
This is feasible at the scan + delete level
(Phases 1 and 2) since equality predicates can be applied on any column 
combination. However, it has significant implications:
 - Performance: prefix deletes map to a contiguous key range (efficient range 
scan). Subset deletes produce scattered matches across the key space, requiring 
broader scans.
 - Partition pruning: less effective when intermediate PK columns are 
unspecified.
 - Range tombstones (Phase 3): not directly applicable. A subset does not map 
to a single [lower, upper) key range, so the O(1) tombstone approach would not 
apply.

Deferred as a separate feature once the prefix-based implementation is stable.

*Summary*
|Aspect|Phase 1|Phase 2|Phase 3|Future Work|
|Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
|Where|Client|Tablet Server|Storage engine|Client/Server|
|Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded PK 
interval)|Scan + arbitrary predicates|
|Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
|WAL writes|N|N|1 per tablet|N|
|Row enumeration|Yes|Yes|No|Yes|
|Compaction|Standard|Standard|Drops covered rows|Standard|
|Range tombstones|No|No|Yes|No|


> Support Delete by Primary Key Prefix (Range Delete)
> ---------------------------------------------------
>
>                 Key: KUDU-3748
>                 URL: https://issues.apache.org/jira/browse/KUDU-3748
>             Project: Kudu
>          Issue Type: New Feature
>            Reporter: Joaquín Antonio De Vicente López
>            Priority: Major
>
> |Aspect|Detail|
> |Status|Proposed|
> |Component|Client, Tablet Server, Storage|
> |Affects|Java Client, C++ Tablet Server|
> *Motivation*
> Real-World Use Case: Spark Structured Streaming Ingestion with Calendar 
> Explosion
> We have an Apache Spark Structured Streaming job consuming messages from 
> Apache Kafka. Each message contains 2 business identifier fields and 4 
> additional fields, each being a list of calendars
> (valid_from, valid_to).
> During transformation the calendar lists are vertically exploded, generating 
> N rows per message where N = max(length(calendar_list_1 ... 
> calendar_list_4)). Shorter lists produce NULL values in their
> respective columns. A positional index column num_cal is generated and 
> included as part of the Primary Key in the Kudu table — without it, only the 
> last calendar row would survive an upsert.
> The problem: we need to perform a logical upsert (replace) per message based 
> on the 2 business identifiers. Since Kudu requires the full Primary Key for 
> delete operations, and our PK includes num_cal,
> we cannot directly delete all rows of a message unless we know every num_cal 
> value. Attempting a delete without num_cal causes the streaming job to fail:
> org.apache.kudu.client.NonRecoverableException: Primary key column num_cal is 
> not set
> Caused by: java.lang.IllegalStateException: Primary key column num_cal is not 
> set
> Impact: the Structured Streaming job fails and aborts, message replacement 
> cannot be performed, and the continuous ingestion pipeline is disrupted.
> General Problem Statement
> Kudu does not support deleting rows by a partial primary key (a leading 
> prefix of the PK columns). Many workloads need a "delete-and-replace" pattern 
> where all rows sharing a logical identifier are
> removed before inserting new versions. Today this requires the caller to know 
> every full PK value, which is impractical when the PK includes generated or 
> positional columns.
> *Proposed Solution:* Three-Phase Approach
> The feature is designed in three phases, each building on the previous:
> |Phase|Approach|Cost per Operation|
> |Phase 1|Client-side scan + delete|O(N) network round-trips|
> |Phase 2|Server-side RPC (scan + batched delete per tablet)|O(N) row reads + 
> writes (server-side)|
> |Phase 3|Storage-level range tombstones|O(1) per tablet|
> *Phase 1:* Client-Side Scan + Delete
> Approach
> The Java client performs the entire operation:
> 1. Validate that the set columns in the supplied PartialRow are a contiguous 
> leading prefix of the table's primary key columns.
> 2. Open a scan with equality predicates on the prefix columns, projecting 
> only PK columns.
> 3. For every row returned by the scan, issue a DELETE through a KuduSession 
> (batched with MANUAL_FLUSH).
> 4. Return the total number of rows deleted.
> Limitations
>  - Network overhead: every matching row's PK is transferred from tablet 
> server to client and back as a delete operation.
>  - Latency: proportional to the number of matching rows.
>  - No server-side optimization: the tablet server is unaware of the 
> higher-level intent.
> *Phase 2:* Server-Side RPC
> Approach
> A dedicated DeleteByPkPrefix RPC is added to the Tablet Server service. The 
> client sends one RPC per relevant tablet; the server performs the scan and 
> delete internally, eliminating network round-trips
> for individual rows.
> *RPC Definition*
> {code:java}
> rpc DeleteByPkPrefix(DeleteByPkPrefixRequestPB)
> returns (DeleteByPkPrefixResponsePB){
>   option (kudu.rpc.track_rpc_result) = true;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixRequestPB {  
>   required bytes tablet_id = 1;
>   // Schema of the prefix columns.
>   optional SchemaPB schema = 2;
>   // Encoded prefix row.
>   optional RowOperationsPB pk_prefix = 3;
>   optional ExternalConsistencyMode external_consistency_mode = 4
>   [default = CLIENT_PROPAGATED];
>   optional fixed64 propagated_timestamp = 5;
>   optional security.SignedTokenPB authz_token = 6;
> }
> {code}
> {code:java}
> message DeleteByPkPrefixResponsePB {
>   optional TabletServerErrorPB error = 1;
>   optional int64 rows_deleted = 2;
>   optional fixed64 timestamp = 3;
>   optional ResourceMetricsPB resource_metrics = 4;
> }{code}
> Server-Side Behavior
> 1. Decode the prefix row from the request.
> 2. Build equality predicates for each prefix column.
> 3. Create an MVCC snapshot iterator projecting only PK columns.
> 4. Scan matching rows in batches and submit DELETE_IGNORE operations through 
> Raft.
> 5. Return the total count of deleted rows.
> Client-Side Behavior
>  - Use partition pruning to determine which tablets may contain matching 
> rows, avoiding RPCs to irrelevant tablets.
>  - Send one DeleteByPkPrefixRequest per relevant tablet.
>  - Sum rows_deleted from all responses.
>  - If any tablet returns a feature-not-supported error (mixed-version 
> cluster), fall back to Phase 1.
> Public API
> DeleteByPkPrefixResponse resp = client.deleteByPkPrefix(table, pkPrefix);
> Semantics
>  - Per-tablet snapshot consistency: rows to delete are determined by an MVCC 
> snapshot taken at the start of the operation on each tablet. Rows inserted 
> after the snapshot are NOT deleted.
>  - Per-tablet non-atomicity: deletes are applied in batches through Raft. 
> Each batch is individually atomic; the full operation within a tablet is NOT 
> a single atomic transaction.
>  - Cross-tablet non-atomicity: one RPC per tablet; no cross-tablet atomicity.
>  - Partial failures: if one or more tablets fail, the client returns an error 
> and the operation may have been partially applied on other tablets. Retrying 
> is safe due to track_rpc_result=true.
>  - Concurrent deletes: uses DELETE_IGNORE internally so concurrent deletes of 
> the same rows are harmless.
>  - Retry safety: track_rpc_result = true enables server-side deduplication.
> Limitations of Phase 2
>  - Still scans every matching row: the server must read each row's PK from 
> storage and issue individual delete mutations through Raft.
>  - Cost proportional to N (matching rows): for tables with millions of rows 
> per prefix, this is expensive in CPU, I/O, and WAL.
>  - WAL amplification: each deleted row generates a Raft log entry.
> *Phase 3:* Storage-Level Range Tombstones
> Motivation
> Phases 1 and 2 solve the functional problem but remain O(N) in the number of 
> matching rows: every row must be scanned and individually deleted. For tables 
> with millions of rows per prefix, this is
> expensive.
> Range tombstones eliminate row enumeration at delete time, while maintaining 
> O(log K) lookup cost during reads. A single metadata entry (lower_key, 
> upper_key, mvcc_timestamp) logically deletes all rows in the key range 
> without touching them individually.
> Phase 3 reduces write amplification by replacing N row-level delete mutations 
> with a single replicated range tombstone operation per tablet.
> Range tombstones do not provide cross-tablet atomicity.
> Atomic multi-tablet semantics require Kudu transactions.
> Proposed Semantics
> MVCC model:
>  - A range tombstone is created at a single MVCC hybrid timestamp T.
>  - Any row version with commit_timestamp <= T whose encoded primary key
> falls within [lower_key, upper_key) is considered deleted for snapshots
> at or after T.
>  - Snapshots taken before T are not affected and continue to observe the row 
> version.
>  - Rows inserted after timestamp T are visible (not deleted), unless covered 
> by a later tombstone.
>  - This is consistent with Kudu's existing MVCC visibility rules.
> Replication:
>  - Range tombstone operations are Raft-replicated like regular writes.
>  - Idempotent under retry via {{track_rpc_result=true}} server-side request 
> deduplication.
> Interaction with row-level operations:
>  - A row-level insert or upsert at timestamp T2 > T produces a new version 
> that is visible despite the tombstone, following standard MVCC visibility 
> rules.
>  - A row-level delete at any timestamp is independent; both can coexist.
> Proposed Storage Design
> New data structure: a range tombstone store — an interval set associated with 
> each RowSet (or at the tablet level for MemRowSet).
> Each entry:
> {lower_key, upper_key, mvcc_timestamp}
> Persistence options (to be evaluated during design):
> |Option|Description|Pros|Cons|
> |A|Separate tombstone file per RowSet|Clean separation|New file format|
> |B|DeltaStore extension|Reuses infra|Poor key-fit|
> |C|Tablet metadata|Simple|Metadata bloat|
> Recommended approach: Option A — a separate tombstone store per RowSet, 
> persisted as a small file with a sorted list of entries. An in-memory 
> interval tree or sorted list enables efficient lookup.
> Read Path Impact
> When producing a row during a scan or point lookup:
> 1. Retrieve the row's encoded PK and commit timestamp.
> 2. Query the range tombstone store for any tombstone covering that PK.
> 3. If a tombstone (lower, upper, T) exists where lower <= PK < upper and 
> row_commit_time <= T, the row is filtered out.
> 4. Lookup must be O(log K) where K = number of tombstones, using binary 
> search or an interval tree.
> Compaction Impact
> During rowset merge/compaction:
>  - Rows fully covered by a tombstone (row_commit_time <= tombstone_time) are 
> physically dropped.
>  - Tombstones are retained as long as needed for MVCC snapshot reads, CDC 
> consumers, and replication lag (following existing history retention rules / 
> --tablet_history_max_age_sec). Tombstones must not be garbage-collected until 
> no active snapshot can observe rows covered by the tombstone.
>  - Overlapping tombstones with compatible timestamps can be merged into a 
> single wider range.
> Tablet Split Impact
> When a tablet splits at a split key S:
>  - Tombstone (lower, upper, T) is split into:
>  - Child A: (lower, min(upper, S), T) — if lower < S
>  - Child B: (max(lower, S), upper, T) — if upper > S
>  - Empty ranges after clipping are discarded.
> API / RPC Changes
> Phase 3 introduces a *new dedicated RPC* for storage-level range tombstone 
> deletes. It does not extend {{DeleteByPkPrefix}} and does not rely on feature 
> flags.
> Phase 2 and Phase 3 are conceptually distinct:
>  * *Phase 2 ({{{}DeleteByPkPrefix{}}})* performs scan + batched row-level 
> deletes.
>  * *Phase 3 ({{{}DeleteByKeyRange{}}})* applies a single MVCC range tombstone 
> over an encoded key interval.
> The two approaches are independent and can coexist. Phase 3 does not modify 
> the semantics of Phase 2.
> {code:java}
> rpc DeleteByKeyRange(DeleteByKeyRangeRequestPB)
>     returns (DeleteByKeyRangeResponsePB) {
>   option (kudu.rpc.track_rpc_result) = true;
> }
> message DeleteByKeyRangeRequestPB {
>   // Target tablet.
>   required bytes tablet_id = 1;
>   // Inclusive lower bound of the encoded primary key.
>   required bytes lower_encoded_key = 2;
>   // Exclusive upper bound of the encoded primary key.
>   required bytes upper_encoded_key = 3;
>   // Optional propagated timestamp for external consistency.
>   optional fixed64 propagated_timestamp = 4;
>   // Authorization token.
>   optional security.SignedTokenPB authz_token = 5;
> }
> message DeleteByKeyRangeResponsePB {   
>    optional TabletServerErrorPB error = 1;
>    // Hybrid timestamp at which the tombstone was applied.
>    optional fixed64 timestamp = 2;
>    optional ResourceMetricsPB resource_metrics = 3; } {code}
> Key Range Computation from PK Prefix
> For a prefix (col1=A, col2=B) on a table with PK (col1, col2, col3):
>  - lower_key = encode(prefix + MIN_SUFFIX)
>  - upper_key = encode(prefix) with next lexicographical successor
> This must correctly handle:
>  - Variable-length types (STRING, BINARY, VARCHAR) and their encoding.
>  - Composite PKs with mixed types.
>  - Per-tablet partition boundaries: the effective range is the intersection 
> of the prefix range and the tablet's key range.
> Testing Plan
> Unit tests:
>  - Range tombstone containment logic (inclusive/exclusive boundaries, edge 
> cases).
>  - Overlapping tombstones with different timestamps.
>  - MVCC visibility: rows inserted before vs. after tombstone timestamp.
>  - Tombstone splitting during tablet split.
> Integration tests:
> 1. Create table with composite PK (id1, id2, num_cal) and hash partitions.
> 2. Insert many rows for a given prefix.
> 3. Issue delete-by-prefix with range tombstone.
> 4. Verify scan no longer returns deleted rows without individual row-level 
> deletes being issued.
> 5. Insert new rows after tombstone timestamp — verify they are visible.
> 6. Trigger compaction — verify on-disk row count decreases.
> 7. Verify behavior across leader changes.
> Performance benchmark (optional):
>  - Compare Phase 2 (scan + delete) vs. Phase 3 (range tombstone) for N = 10K, 
> 100K, 1M matching rows.
>  - Measure: wall time, WAL bytes written, I/O.
> Limitations and Future Work
>  - CDC: range tombstones may need special handling in the Change Data Capture 
> stream. Initially, they could be expanded into individual delete events at 
> CDC read time.
>  - Backup/Restore: backup tooling must preserve range tombstones.
>  - Tombstone accumulation: without compaction, many small tombstones could 
> degrade read performance. A background task to merge adjacent/overlapping 
> tombstones may be needed.
>  - Cross-tablet atomicity: remains unsupported. Use Kudu transactions if 
> needed.
>  - Delete by PK subset: a natural extension would be to support deleting by 
> an arbitrary subset of PK columns (e.g., col1 and col3 without specifying 
> col2). This is feasible at the scan + delete level
> (Phases 1 and 2) since equality predicates can be applied on any column 
> combination. However, it has significant implications:
>  - Performance: prefix deletes map to a contiguous key range (efficient range 
> scan). Subset deletes produce scattered matches across the key space, 
> requiring broader scans.
>  - Partition pruning: less effective when intermediate PK columns are 
> unspecified.
>  - Range tombstones (Phase 3): not directly applicable. A subset does not map 
> to a single [lower, upper) key range, so the O(1) tombstone approach would 
> not apply.
> Deferred as a separate feature once the prefix-based implementation is stable.
> *Summary*
> |Aspect|Phase 1|Phase 2|Phase 3|Future Work|
> |Scope|PK prefix|PK prefix|PK prefix|Arbitrary PK subset|
> |Where|Client|Tablet Server|Storage engine|Client/Server|
> |Mechanism|Scan + delete|Scan + batched delete|MVCC range tombstone (encoded 
> PK interval)|Scan + arbitrary predicates|
> |Cost|O(N) network|O(N) server|O(1) per tablet|O(N) broader scan|
> |WAL writes|N|N|1 per tablet|N|
> |Row enumeration|Yes|Yes|No|Yes|
> |Compaction|Standard|Standard|Drops covered rows|Standard|
> |Range tombstones|No|No|Yes|No|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to