This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 097a89f0400 Add support for Kafka subset-partition realtime ingestion
(#17587)
097a89f0400 is described below
commit 097a89f0400189987ddf35da790558b09e5e3f91
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Mar 12 11:48:52 2026 -0700
Add support for Kafka subset-partition realtime ingestion (#17587)
Allow a realtime table to consume only a subset of Kafka partitions by
configuring `stream.kafka.partition.ids` (e.g. "0,2,5"). This enables
splitting a single Kafka topic across multiple Pinot tables for
independent scaling and isolation.
Key changes:
- Add KafkaPartitionSubsetUtils to parse and validate partition ID
configuration from StreamConfig
- Update KafkaStreamMetadataProvider (kafka30/kafka40) to filter
partitions and offsets based on the configured subset while still
returning the total Kafka partition count for instance assignment
- Update PinotLLCRealtimeSegmentManager to use total partition count
from instance partitions for segment ZK metadata, ensuring correct
broker query routing across subset tables
- Add QuickStart example with two subset tables splitting a 2-partition
topic (fineFoodReviews_part_0 and fineFoodReviews_part_1)
- Add unit tests, integration tests, and a chaos integration test
validating partition assignment, segment creation, and query routing
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 27 +-
.../instance/InstanceAssignmentTest.java | 170 ++++++
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 174 +++++-
.../KafkaPartitionSubsetChaosIntegrationTest.java | 607 +++++++++++++++++++++
...alTableWithTwoRealtimeTableIntegrationTest.java | 251 +++++++++
.../kafka30/KafkaStreamMetadataProvider.java | 85 ++-
.../kafka30/KafkaPartitionLevelConsumerTest.java | 76 +++
.../kafka40/KafkaStreamMetadataProvider.java | 85 ++-
.../kafka40/KafkaPartitionLevelConsumerTest.java | 109 +++-
.../kafka/KafkaPartitionLevelStreamConfig.java | 9 +
.../stream/kafka/KafkaPartitionSubsetUtils.java | 85 +++
.../stream/kafka/KafkaStreamConfigProperties.java | 6 +
.../kafka/KafkaPartitionSubsetUtilsTest.java | 201 +++++++
.../org/apache/pinot/tools/QuickStartBase.java | 104 +++-
.../org/apache/pinot/tools/RealtimeQuickStart.java | 25 +
...neFoodReviews_part_0_realtime_table_config.json | 79 +++
.../fineFoodReviews_part_0_schema.json | 48 ++
...neFoodReviews_part_1_realtime_table_config.json | 79 +++
.../fineFoodReviews_part_1_schema.json | 48 ++
19 files changed, 2210 insertions(+), 58 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6720161c620..113cdac8e5c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -397,12 +397,11 @@ public class PinotLLCRealtimeSegmentManager {
String realtimeTableName = tableConfig.getTableName();
LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
- int numPartitionGroups = 0;
for (StreamMetadata streamMetadata : streamMetadataList) {
_flushThresholdUpdateManager.clearFlushThresholdUpdater(streamMetadata.getStreamConfig());
- numPartitionGroups += streamMetadata.getNumPartitions();
}
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
+ int numPartitionGroups = getPartitionCountForRouting(streamMetadataList);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment =
@@ -1727,10 +1726,7 @@ public class PinotLLCRealtimeSegmentManager {
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
- int numPartitions = 0;
- for (StreamMetadata streamMetadata : streamMetadataList) {
- numPartitions += streamMetadata.getNumPartitions();
- }
+ int numPartitions = getPartitionCountForRouting(streamMetadataList);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager,
tableConfig, _controllerMetrics);
@@ -2123,6 +2119,25 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Gets the total partition count for routing segment assignment.
+ *
+ * <p>For subset-partition tables, this returns the total Kafka topic
partition count (not the
+ * subset size) so that RealtimeSegmentAssignment correctly routes
non-contiguous partition IDs via
+ * {@code partitionId % totalPartitions}.
+ *
+ * <p>For standard tables, this equals the number of actively consumed
partitions.
+ *
+ * <p>For tables with multiple streams, this sums the partition counts
across all streams.
+ */
+ private int getPartitionCountForRouting(List<StreamMetadata>
streamMetadataList) {
+ int totalPartitionCount = 0;
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ totalPartitionCount += streamMetadata.getNumPartitions();
+ }
+ return totalPartitionCount;
+ }
+
private int getMaxNumPartitionsPerInstance(InstancePartitions
instancePartitions, int numPartitions,
int numReplicas) {
if (instancePartitions.getNumReplicaGroups() == 1) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 62a1af26bd6..e1d76e916a3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -3391,4 +3391,174 @@ public class InstanceAssignmentTest {
assertEquals(steadyStatePartitions.getInstances(0, rg),
initialPartitions.getInstances(0, rg));
}
}
+ /**
+ * Verifies that subset-partition tables use the total Kafka partition count
(not the subset size)
+ * for instance assignment, producing the same server spread as a normal
full-partition table.
+ *
+ * <p><b>Topology:</b> 2 Kafka topic partitions, 4 servers, 2 replica groups,
+ * 1 instance per partition per replica group.
+ * <ul>
+ * <li>Table A has {@code stream.kafka.partition.ids = "0"} (consumes only
Kafka partition 0)</li>
+ * <li>Table B has {@code stream.kafka.partition.ids = "1"} (consumes only
Kafka partition 1)</li>
+ * </ul>
+ *
+ * <p>The {@link ImplicitRealtimeTablePartitionSelector} always fetches the
<em>total</em>
+ * Kafka partition count from {@link
StreamMetadataProvider#fetchPartitionCount} (= 2). This
+ * produces an instance map with <b>two distinct slots</b> so that {@link
+ *
org.apache.pinot.controller.helix.core.assignment.segment.RealtimeSegmentAssignment}
routes
+ * Kafka partition 0 → slot 0 and Kafka partition 1 → slot 1, each backed by
different servers.
+ *
+ * <p>Without this behaviour ({@code numPartitions = subsetSize = 1}), only
slot 0 exists, and
+ * the assignment computes {@code kafkaPartitionId % 1 = 0} for
<em>every</em> Kafka partition,
+ * routing all consuming segments to the same slot-0 servers — a hotspot on
lower-indexed servers.
+ *
+ * <p><b>Pre-computed hash rotations</b> (used for exact expected server
values):
+ * <pre>
+ * Math.abs("subsetTablePartition0_REALTIME".hashCode()) % 4 = 0 → no
rotation
+ * Pool after rotation: [s0, s1, s2, s3]
+ * Round-robin to 2 RGs: RG0=[s0,s2], RG1=[s1,s3]
+ * slot 0: RG0=s0, RG1=s1 | slot 1: RG0=s2, RG1=s3
+ *
+ * Math.abs("subsetTablePartition1_REALTIME".hashCode()) % 4 = 1 →
rotate by 1
+ * Pool after rotation: [s1, s2, s3, s0]
+ * Round-robin to 2 RGs: RG0=[s1,s3], RG1=[s2,s0]
+ * slot 0: RG0=s1, RG1=s2 | slot 1: RG0=s3, RG1=s0
+ * </pre>
+ */
+ @Test
+ public void testSubsetPartitionInstanceAssignmentNoHotspot() {
+ final int numReplicas = 2;
+ final int numKafkaPartitions = 2; // total Kafka topic partition count
+ final int numServers = 4;
+ final int numInstancesPerReplicaGroup = numServers / numReplicas; // = 2
+
+ // 4 servers, single pool (non-pool-based), sorted lexicographically:
+ // [Server_localhost_0, Server_localhost_1, Server_localhost_2,
Server_localhost_3]
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ InstanceConfig cfg = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ cfg.addTag(REALTIME_TAG);
+ instanceConfigs.add(cfg);
+ }
+
+ // The mock always returns 2 (total topic partition count) regardless of
the configured subset.
+ StreamMetadataProvider streamMetadataProvider =
mock(StreamMetadataProvider.class);
+
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numKafkaPartitions);
+
+ InstanceReplicaGroupPartitionConfig rgConfig = new
InstanceReplicaGroupPartitionConfig(
+ true, 0, numReplicas, numInstancesPerReplicaGroup, 0, 0, false, null);
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(REALTIME_TAG, false, 0, null), null,
rgConfig,
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
false);
+
+ // ── Table A: assigned subset {partition 0}
───────────────────────────────────────────────
+ // Hash rotation = 0 → pool [s0,s1,s2,s3] unchanged.
+ // Round-robin → RG0=[s0,s2], RG1=[s1,s3]; 1 instance/partition
(ImplicitSelector enforces):
+ // slot 0: RG0=s0, RG1=s1
+ // slot 1: RG0=s2, RG1=s3
+ String tableAName = "subsetTablePartition0";
+ TableConfig tableAConfig = new TableConfigBuilder(TableType.REALTIME)
+
.setTableName(tableAName).setServerTenant(TENANT_NAME).setNumReplicas(numReplicas)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ InstancePartitions tableAPartitions = new
InstanceAssignmentDriver(tableAConfig)
+ .getInstancePartitions(
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableAName),
+ instanceAssignmentConfig, instanceConfigs, null, false,
+ new ImplicitRealtimeTablePartitionSelector(rgConfig,
tableAConfig.getTableName(), null, false,
+ streamMetadataProvider));
+
+ // Key correctness check: total Kafka partition count (2) must be used,
not subset size (1).
+ assertEquals(tableAPartitions.getNumPartitions(), numKafkaPartitions,
+ "Table A must use total Kafka partition count, not subset size");
+ assertEquals(tableAPartitions.getNumReplicaGroups(), numReplicas);
+ // slot 0 (Kafka partition 0 → 0 % 2 = 0)
+ assertEquals(tableAPartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(tableAPartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ // slot 1 (Kafka partition 1 → 1 % 2 = 1, if it were consumed here)
+ assertEquals(tableAPartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(tableAPartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+
+ // ── Table B: assigned subset {partition 1}
───────────────────────────────────────────────
+ // Hash rotation = 1 → rotated pool [s1,s2,s3,s0].
+ // Round-robin → RG0=[s1,s3], RG1=[s2,s0]; 1 instance/partition:
+ // slot 0: RG0=s1, RG1=s2
+ // slot 1: RG0=s3, RG1=s0
+ String tableBName = "subsetTablePartition1";
+ TableConfig tableBConfig = new TableConfigBuilder(TableType.REALTIME)
+
.setTableName(tableBName).setServerTenant(TENANT_NAME).setNumReplicas(numReplicas)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ InstancePartitions tableBPartitions = new
InstanceAssignmentDriver(tableBConfig)
+ .getInstancePartitions(
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableBName),
+ instanceAssignmentConfig, instanceConfigs, null, false,
+ new ImplicitRealtimeTablePartitionSelector(rgConfig,
tableBConfig.getTableName(), null, false,
+ streamMetadataProvider));
+
+ assertEquals(tableBPartitions.getNumPartitions(), numKafkaPartitions,
+ "Table B must use total Kafka partition count, not subset size");
+ assertEquals(tableBPartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(tableBPartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(tableBPartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ // slot 1 (Kafka partition 1 → 1 % 2 = 1)
+ assertEquals(tableBPartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(tableBPartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+
+ // ── Anti-hotspot: within each table, slot 0 and slot 1 use disjoint
servers ───────────────
+ // RealtimeSegmentAssignment routes: Kafka partition X → slot = X %
numPartitions.
+ // With numPartitions=2, slot 0 and slot 1 are guaranteed to be on
different servers,
+ // so different Kafka partitions do NOT share consuming instances within
the same table.
+ Set<String> tableASlot0 = new HashSet<>(tableAPartitions.getInstances(0,
0));
+ tableASlot0.addAll(tableAPartitions.getInstances(0, 1));
+ Set<String> tableASlot1 = new HashSet<>(tableAPartitions.getInstances(1,
0));
+ tableASlot1.addAll(tableAPartitions.getInstances(1, 1));
+ assertTrue(Collections.disjoint(tableASlot0, tableASlot1),
+ "Table A: slot 0 and slot 1 must be on disjoint servers (no
intra-table hotspot)");
+
+ Set<String> tableBSlot0 = new HashSet<>(tableBPartitions.getInstances(0,
0));
+ tableBSlot0.addAll(tableBPartitions.getInstances(0, 1));
+ Set<String> tableBSlot1 = new HashSet<>(tableBPartitions.getInstances(1,
0));
+ tableBSlot1.addAll(tableBPartitions.getInstances(1, 1));
+ assertTrue(Collections.disjoint(tableBSlot0, tableBSlot1),
+ "Table B: slot 0 and slot 1 must be on disjoint servers (no
intra-table hotspot)");
+
+ // Each table spreads load evenly: 2 slots × 2 replica groups = all 4
servers.
+ Set<String> tableAAll = new HashSet<>(tableASlot0);
+ tableAAll.addAll(tableASlot1);
+ assertEquals(tableAAll.size(), numServers,
+ "Table A must use all " + numServers + " servers");
+ Set<String> tableBAll = new HashSet<>(tableBSlot0);
+ tableBAll.addAll(tableBSlot1);
+ assertEquals(tableBAll.size(), numServers,
+ "Table B must use all " + numServers + " servers");
+
+ // ── Negative case: numPartitions = 1 (wrong: uses subset size instead of
total count) ─────
+ // With InstanceReplicaGroupPartitionSelector (bypasses stream-count
lookup), numPartitions=1.
+ // Only slot 0 exists in the instance map. RealtimeSegmentAssignment then
computes:
+ // Kafka partition 1 → 1 % 1 = 0 → slot 0 (same as partition 0 →
HOTSPOT)
+ InstanceReplicaGroupPartitionConfig wrongRgConfig = new
InstanceReplicaGroupPartitionConfig(
+ true, 0, numReplicas, numInstancesPerReplicaGroup, 1 /* wrong: subset
size */, 1, false, null);
+
+ // Table B wrong assignment (rotation=1): only slot 0 → RG0=s1, RG1=s2.
+ InstancePartitions wrongTableBPartitions = new
InstanceAssignmentDriver(tableBConfig)
+ .getInstancePartitions(
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableBName),
+ instanceAssignmentConfig, instanceConfigs, null, false,
+ new InstanceReplicaGroupPartitionSelector(wrongRgConfig,
tableBConfig.getTableName(), null, false));
+
+ // Wrong approach: only slot 0 exists; slot 1 is missing entirely.
+ assertEquals(wrongTableBPartitions.getNumPartitions(), 1,
+ "Wrong approach produces only 1 partition slot");
+ assertNull(wrongTableBPartitions.getInstances(1, 0),
+ "Slot 1 must not exist when numPartitions=1; Kafka partition 1 falls
back to slot 0 via 1 % 1 = 0");
+
+ // Under the wrong approach, Kafka partition 1 would be routed to slot 0
(RG0 → s1).
+ // Under the correct approach, it routes to slot 1 (RG0 → s3). These are
different servers.
+ String wrongServerForP1 = wrongTableBPartitions.getInstances(0, 0).get(0);
// s1 (slot-0 hotspot)
+ String correctServerForP1 = tableBPartitions.getInstances(1, 0).get(0);
// s3 (slot 1)
+ assertNotEquals(wrongServerForP1, correctServerForP1,
+ "Wrong approach routes Kafka partition 1 to '" + wrongServerForP1
+ + "' (slot-0 hotspot) instead of the correct '" +
correctServerForP1 + "' (slot 1)");
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 9b00d996676..4b00f8a2608 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -335,19 +335,20 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
public void testExplicitPartition() {
// CONSUMING instances:
// {
- // 0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2],
- // 0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5],
- // 0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8]
+ // 0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2],
3_0=[instance_0],
+ // 0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5],
3_1=[instance_3],
+ // 0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8], 3_2=[instance_6]
// }
// p0 p1 p2
// p3
InstancePartitions consumingInstancePartitions = new
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES /
NUM_REPLICAS;
- int consumingInstanceIdToAdd = 0;
for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS;
replicaGroupId++) {
- for (int partitionId = 0; partitionId <
numConsumingInstancesPerReplicaGroup; partitionId++) {
+ for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
+ int instanceIndex = (partitionId %
numConsumingInstancesPerReplicaGroup)
+ + replicaGroupId * numConsumingInstancesPerReplicaGroup;
consumingInstancePartitions.setInstances(partitionId, replicaGroupId,
-
Collections.singletonList(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++)));
+ Collections.singletonList(CONSUMING_INSTANCES.get(instanceIndex)));
}
}
@@ -458,6 +459,167 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
+ /**
+ * Tests segment assignment for tables consuming subset partitions with
non-contiguous partition IDs.
+ *
+ * <p><b>Key Invariant:</b> A table consuming subset partitions {0, 2, 5, 7}
from an 8-partition topic
+ * should assign those partitions to the SAME instances as a full table
consuming all 8 partitions.
+ *
+ * <p>This is achieved by using the TOTAL partition count (8) in the
instance partitions map, so that
+ * RealtimeSegmentAssignment routes both subset and full tables identically
via {@code partitionId % 8}.
+ *
+ * <p>This test verifies:
+ * <ul>
+ * <li>Subset table assignment matches full table assignment for the same
partition IDs</li>
+ * <li>Non-contiguous partitions don't create hotspots</li>
+ * <li>All consuming instances are utilized (no underutilization)</li>
+ * </ul>
+ */
+ @Test
+ public void testSubsetPartitionAssignment() {
+ // Subset partition IDs: non-contiguous selection from 8 total partitions
+ int[] subsetPartitionIds = {0, 2, 5, 7};
+ int totalKafkaPartitions = 8;
+ int numSegmentsPerPartition = 3;
+
+ // Create segments only for the subset partitions
+ List<String> subsetSegments = new ArrayList<>();
+ for (int partitionId : subsetPartitionIds) {
+ for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
+ subsetSegments.add(new LLCSegmentName(RAW_TABLE_NAME, partitionId,
seqNum,
+ System.currentTimeMillis()).getSegmentName());
+ }
+ }
+
+ // CONSUMING instances with explicit partitions based on TOTAL partition
count (8):
+ // This is critical: instance map has 8 slots, not 4, even though only 4
partitions are consumed.
+ // {
+ // 0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2],
3_0=[instance_0],
+ // 4_0=[instance_1], 5_0=[instance_2], 6_0=[instance_0],
7_0=[instance_1],
+ // 0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5],
3_1=[instance_3],
+ // 4_1=[instance_4], 5_1=[instance_5], 6_1=[instance_3],
7_1=[instance_4],
+ // 0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8],
3_2=[instance_6],
+ // 4_2=[instance_7], 5_2=[instance_8], 6_2=[instance_6], 7_2=[instance_7]
+ // }
+ InstancePartitions consumingInstancePartitions = new
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
+ int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES /
NUM_REPLICAS;
+ for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS;
replicaGroupId++) {
+ for (int partitionId = 0; partitionId < totalKafkaPartitions;
partitionId++) {
+ int instanceIndex = (partitionId %
numConsumingInstancesPerReplicaGroup)
+ + replicaGroupId * numConsumingInstancesPerReplicaGroup;
+ consumingInstancePartitions.setInstances(partitionId, replicaGroupId,
+ Collections.singletonList(CONSUMING_INSTANCES.get(instanceIndex)));
+ }
+ }
+
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Map.of(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
+
+ // First, assign segments for a FULL table (all 8 partitions) to establish
baseline
+ Map<String, Map<String, String>> fullTableAssignment = new TreeMap<>();
+ List<String> fullTableSegments = new ArrayList<>();
+ for (int partitionId = 0; partitionId < totalKafkaPartitions;
partitionId++) {
+ for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
+ fullTableSegments.add(new LLCSegmentName(RAW_TABLE_NAME, partitionId,
seqNum,
+ System.currentTimeMillis()).getSegmentName());
+ }
+ }
+ for (String segmentName : fullTableSegments) {
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, fullTableAssignment,
instancePartitionsMap);
+ fullTableAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
+ }
+
+ // Now assign segments for SUBSET table (only partitions {0, 2, 5, 7})
+ Map<String, Map<String, String>> subsetTableAssignment = new TreeMap<>();
+ for (String segmentName : subsetSegments) {
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(segmentName, subsetTableAssignment,
instancePartitionsMap);
+ assertEquals(instancesAssigned.size(), NUM_REPLICAS);
+
+ // Extract partition ID from segment name
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionId = llcSegmentName.getPartitionGroupId();
+
+ // Verify the partition ID is one of our subset partitions
+ boolean isSubsetPartition = false;
+ for (int subsetId : subsetPartitionIds) {
+ if (partitionId == subsetId) {
+ isSubsetPartition = true;
+ break;
+ }
+ }
+ assertTrue(isSubsetPartition, "Segment partition " + partitionId + "
should be in subset");
+
+ // KEY VALIDATION: Subset table assignment should MATCH full table
assignment for same partition
+ // This proves that subset tables use the same routing logic as full
tables
+ Map<String, String> fullTableInstanceStateMap =
fullTableAssignment.get(segmentName);
+ Map<String, String> subsetTableInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING);
+
+ assertEquals(subsetTableInstanceStateMap.keySet(),
fullTableInstanceStateMap.keySet(),
+ "Subset table partition " + partitionId + " should be assigned to
the SAME instances as full table");
+
+ // Verify assignment uses partitionId % totalPartitions for routing
+ // Partition 0 → slot 0 → instance 0, 3, 6
+ // Partition 2 → slot 2 → instance 2, 5, 8
+ // Partition 5 → slot 5 → instance 2, 5, 8 (5 % 3 = 2)
+ // Partition 7 → slot 7 → instance 1, 4, 7 (7 % 3 = 1)
+ for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS;
replicaGroupId++) {
+ int expectedInstanceIndex = (partitionId %
numConsumingInstancesPerReplicaGroup)
+ + replicaGroupId * numConsumingInstancesPerReplicaGroup;
+ String expectedInstance =
CONSUMING_INSTANCES.get(expectedInstanceIndex);
+ assertEquals(instancesAssigned.get(replicaGroupId), expectedInstance,
+ "Partition " + partitionId + " in replica group " + replicaGroupId
+ + " should be assigned to instance " + expectedInstance);
+ }
+
+ subsetTableAssignment.put(segmentName, subsetTableInstanceStateMap);
+ }
+
+ // Verify that subset table uses the same instances as full table for each
partition
+ for (String segmentName : subsetSegments) {
+ Map<String, String> subsetInstances =
subsetTableAssignment.get(segmentName);
+ Map<String, String> fullInstances = fullTableAssignment.get(segmentName);
+ assertEquals(subsetInstances.keySet(), fullInstances.keySet(),
+ "Segment " + segmentName + " should have identical instance
assignment in subset and full tables");
+ }
+
+ // Verify all subset partitions are distributed across different instances
(no hotspot)
+ // With modulo-8 routing:
+ // Partition 0 → slot 0 → instances {0, 3, 6}
+ // Partition 2 → slot 2 → instances {2, 5, 8}
+ // Partition 5 → slot 5 → instances {2, 5, 8} (same as slot 2)
+ // Partition 7 → slot 7 → instances {1, 4, 7}
+ // Total unique instances: {0, 1, 2, 3, 4, 5, 6, 7, 8} = 9 instances
+ HashSet<String> usedInstances = new HashSet<>();
+ for (Map<String, String> instanceStateMap :
subsetTableAssignment.values()) {
+ usedInstances.addAll(instanceStateMap.keySet());
+ }
+ // With correct modulo-8 routing, all 9 consuming instances should be used
+ assertEquals(usedInstances.size(), NUM_CONSUMING_INSTANCES,
+ "Subset partition assignment with modulo-8 routing should use all
instances, found: " + usedInstances.size());
+
+ // Verify each subset partition uses distinct instance sets (within each
replica group)
+ Map<Integer, HashSet<String>> partitionToInstancesRG0 = new TreeMap<>();
+ for (int partitionId : subsetPartitionIds) {
+ int instanceIndex = partitionId % numConsumingInstancesPerReplicaGroup;
+ HashSet<String> instances = new HashSet<>();
+ instances.add(CONSUMING_INSTANCES.get(instanceIndex)); // RG0
+ instances.add(CONSUMING_INSTANCES.get(instanceIndex +
numConsumingInstancesPerReplicaGroup)); // RG1
+ instances.add(CONSUMING_INSTANCES.get(instanceIndex + 2 *
numConsumingInstancesPerReplicaGroup)); // RG2
+ partitionToInstancesRG0.put(partitionId, instances);
+ }
+
+ // Partitions 0 and 7 should NOT share the same instance set
+ // (This would happen if we incorrectly used subset size = 4 for routing)
+ HashSet<String> partition0Instances = partitionToInstancesRG0.get(0);
+ HashSet<String> partition7Instances = partitionToInstancesRG0.get(7);
+ assertTrue(!partition0Instances.equals(partition7Instances),
+ "Partitions 0 and 7 should map to different instance sets to avoid
hotspots");
+ }
+
private HelixManager createHelixManager() {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
new file mode 100644
index 00000000000..928bc44537f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterTest;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Chaos integration test for the Kafka partition subset feature.
+ *
+ * <p><b>Topology:</b>
+ * <ul>
+ * <li>1 Kafka topic with {@value NUM_KAFKA_PARTITIONS} partitions</li>
+ * <li>1 "control" realtime table consuming <em>all</em> partitions (no
subset filter), serving as ground truth</li>
+ * <li>3 "subset" realtime tables with <em>non-contiguous</em> partition
assignments:
+ * <ul>
+ * <li>subset_0: partitions {0, 3}</li>
+ * <li>subset_1: partitions {1, 4}</li>
+ * <li>subset_2: partitions {2, 5}</li>
+ * </ul>
+ * </li>
+ * <li>1 logical table aggregating all three subset tables</li>
+ * </ul>
+ *
+ * <p><b>Key invariant:</b> {@code COUNT(*) on logical table == COUNT(*) on
control table}
+ *
+ * <p><b>Chaos scenarios tested:</b>
+ * <ol>
+ * <li>Force-commit consuming segments on all subset tables</li>
+ * <li>Pause consumption, push more data, then resume – data must not be
lost or duplicated</li>
+ * <li>Server restart – tables must recover and resume from their last
committed offset</li>
+ * <li>Ingest a second batch after restart – counts must continue to
match</li>
+ * </ol>
+ *
+ * <p>The non-contiguous partition IDs (e.g., {0,3}) are intentional: they
expose regressions
+ * where code uses {@code getNumPartitions()} (returns a sequential 0..N-1
count) instead of
+ * {@code getPartitionIds()} (returns the actual assigned partition ID set).
+ */
+public class KafkaPartitionSubsetChaosIntegrationTest extends
BaseClusterIntegrationTest {
+
+ // Kafka topic: 6 partitions to allow non-contiguous splits
+ private static final String KAFKA_TOPIC = "partitionSubsetChaosTopic";
+ private static final int NUM_KAFKA_PARTITIONS = 6;
+
+ // Control table: ground truth – reads all 6 partitions
+ private static final String CONTROL_TABLE = "controlAllPartitions";
+
+ // Logical table: aggregates the three subset tables
+ private static final String LOGICAL_TABLE = "logicalSubset";
+
+ // Subset tables with non-contiguous partition assignments.
+ // Using {0,3}, {1,4}, {2,5} deliberately skips sequential ordering to expose
+ // getNumPartitions() vs getPartitionIds() bugs.
+ private static final String[] SUBSET_TABLE_NAMES = {"subset0", "subset1",
"subset2"};
+ private static final int[][] SUBSET_PARTITION_ASSIGNMENTS = {{0, 3}, {1, 4},
{2, 5}};
+
+ private static final long VERIFICATION_TIMEOUT_MS = 600_000L;
+ private static final String DEFAULT_TENANT = "DefaultTenant";
+
+ // Per-partition record counts – updated whenever we push data
+ private final long[] _partitionRecordCounts = new long[NUM_KAFKA_PARTITIONS];
+
+ //
---------------------------------------------------------------------------
+ // ClusterTest overrides
+ //
---------------------------------------------------------------------------
+
+ @Override
+ protected String getKafkaTopic() {
+ return KAFKA_TOPIC;
+ }
+
+ @Override
+ protected int getNumKafkaPartitions() {
+ return NUM_KAFKA_PARTITIONS;
+ }
+
+ /** The "default" table name used by the base-class helper methods (schema
creation, etc.). */
+ @Override
+ public String getTableName() {
+ return CONTROL_TABLE;
+ }
+
+ @Override
+ public String getHelixClusterName() {
+ return "KafkaPartitionSubsetChaosIntegrationTest";
+ }
+
+ //
---------------------------------------------------------------------------
+ // Suite lifecycle
+ //
---------------------------------------------------------------------------
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+ startKafka();
+
+ List<File> avroFiles = getAllAvroFiles();
+
+ // Set the sample file used by AvroFileSchemaKafkaAvroMessageDecoder
(static, schema is shared)
+ ClusterTest.AvroFileSchemaKafkaAvroMessageDecoder._avroFile =
avroFiles.get(0);
+
+ // ---- Control table: consumes all partitions ----
+ Schema controlSchema = createSchema();
+ addSchema(controlSchema);
+ addTableConfig(buildRealtimeTableConfig(CONTROL_TABLE, null));
+
+ // ---- Subset tables: each consumes a non-contiguous partition subset ----
+ for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+ Schema schema = createSchema();
+ schema.setSchemaName(SUBSET_TABLE_NAMES[i]);
+ addSchema(schema);
+ addTableConfig(buildRealtimeTableConfig(SUBSET_TABLE_NAMES[i],
SUBSET_PARTITION_ASSIGNMENTS[i]));
+ }
+
+ // ---- Logical table ----
+ Schema logicalSchema = createSchema();
+ logicalSchema.setSchemaName(LOGICAL_TABLE);
+ addSchema(logicalSchema);
+ addLogicalTableConfig(LOGICAL_TABLE, buildSubsetPhysicalTableNames());
+
+ // ---- Push initial data round-robin across all 6 partitions ----
+ pushAvroRoundRobin(avroFiles, 0);
+
+ // ---- Wait for all tables to catch up ----
+ waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ //
---------------------------------------------------------------------------
+ // Tests (run in declared order via priority)
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Sanity check: after initial ingestion the logical table and the control
table must agree.
+ */
+ @Test(priority = 0)
+ public void testCountInvariantAfterInitialIngestion() {
+ assertCountInvariant();
+ }
+
+ /**
+ * Force-commit consuming segments on each subset table and verify no data
is lost or duplicated.
+ *
+ * <p>This exercises the path where the controller picks up consuming
segments by partition ID.
+ * A regression using {@code getNumPartitions()} would cause it to iterate
partitions 0..1 for a
+ * table assigned to e.g. {0,3} and miss partition 3 entirely.
+ */
+ @Test(priority = 1, dependsOnMethods =
"testCountInvariantAfterInitialIngestion")
+ public void testCountInvariantAfterForceCommit()
+ throws Exception {
+ for (String tableName : SUBSET_TABLE_NAMES) {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ Set<String> consumingBefore = getConsumingSegments(realtimeTableName);
+ if (consumingBefore.isEmpty()) {
+ continue;
+ }
+ String jobId = forceCommit(realtimeTableName);
+ waitForForceCommitCompletion(realtimeTableName, jobId, consumingBefore,
120_000L);
+ }
+
+ // Counts must be unchanged after force-commit
+ assertCountInvariant();
+ }
+
+ /**
+ * Pause consumption on all subset tables, push a second batch of records,
then resume.
+ *
+ * <p>While consumption is paused the control table keeps ingesting, so the
logical table will
+ * temporarily fall behind. After resumption it must catch back up so that
the invariant is
+ * restored.
+ */
+ @Test(priority = 2, dependsOnMethods = "testCountInvariantAfterForceCommit")
+ public void testCountInvariantAfterPauseResume()
+ throws Exception {
+ // Pause all subset tables
+ for (String tableName : SUBSET_TABLE_NAMES) {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ pauseConsumptionAndWait(realtimeTableName);
+ }
+
+ // Push a second batch – control table will consume it, subset tables will
not (yet)
+ List<File> avroFiles = getAllAvroFiles();
+ long countBeforePush = totalExpectedCount();
+ pushAvroRoundRobin(avroFiles, /* keyOffset= */ (int) countBeforePush);
+
+ long expectedControlCount = totalExpectedCount();
+
+ // Wait for control table to ingest the new records
+ TestUtils.waitForCondition(
+ aVoid -> getCurrentCountStarResult(CONTROL_TABLE) ==
expectedControlCount,
+ 1_000L, VERIFICATION_TIMEOUT_MS,
+ "Control table did not reach expected count " + expectedControlCount +
" after second push");
+
+ // Resume all subset tables
+ for (String tableName : SUBSET_TABLE_NAMES) {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ resumeConsumptionAndWait(realtimeTableName);
+ }
+
+ // After resumption the invariant must be restored
+ waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+ }
+
+ /**
+ * Restart all servers and verify all tables recover and resume from their
last committed offset.
+ */
+ @Test(priority = 3, dependsOnMethods = "testCountInvariantAfterPauseResume")
+ public void testCountInvariantAfterServerRestart()
+ throws Exception {
+ long expectedTotal = totalExpectedCount();
+
+ restartServers();
+
+ // After restart, wait for all tables to reach their expected counts again
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getCurrentCountStarResult(CONTROL_TABLE) == expectedTotal
+ && getCurrentCountStarResult(LOGICAL_TABLE) == expectedTotal;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 1_000L, VERIFICATION_TIMEOUT_MS, "Tables did not recover expected
counts after server restart");
+
+ assertCountInvariant();
+ }
+
+ /**
+ * Push a third batch after server restart to confirm ongoing ingestion
works correctly.
+ *
+ * <p>This catches regressions where partition state is re-initialised
incorrectly after restart
+ * (e.g., resetting offsets to 0 based on a sequential 0..N-1 scan instead
of the configured
+ * partition IDs).
+ */
+ @Test(priority = 4, dependsOnMethods =
"testCountInvariantAfterServerRestart")
+ public void testCountInvariantAfterIngestPostRestart()
+ throws Exception {
+ List<File> avroFiles = getAllAvroFiles();
+ long countBeforePush = totalExpectedCount();
+ pushAvroRoundRobin(avroFiles, /* keyOffset= */ (int) countBeforePush);
+
+ waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+ }
+
+ //
---------------------------------------------------------------------------
+ // Table-config builders
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Builds a realtime {@link TableConfig} for the given table name.
+ *
+ * @param tableName the raw (un-typed) table name
+ * @param partitionIds partition IDs to assign, or {@code null} to consume
all partitions
+ */
+ private TableConfig buildRealtimeTableConfig(String tableName, int[]
partitionIds) {
+ Map<String, String> streamConfigMap = new HashMap<>(getStreamConfigMap());
+
+ if (partitionIds != null) {
+ // Convert int[] to comma-separated string, e.g. "0,3"
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitionIds.length; i++) {
+ if (i > 0) {
+ sb.append(',');
+ }
+ sb.append(partitionIds[i]);
+ }
+ streamConfigMap.put("stream.kafka.partition.ids", sb.toString());
+ }
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
+
+ return new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(tableName)
+ .setTimeColumnName(getTimeColumnName())
+ .setNumReplicas(getNumReplicas())
+ .setBrokerTenant(DEFAULT_TENANT)
+ .setServerTenant(DEFAULT_TENANT)
+ .setIngestionConfig(ingestionConfig)
+ .setNullHandlingEnabled(getNullHandlingEnabled())
+ .build();
+ }
+
+ //
---------------------------------------------------------------------------
+ // Logical table helpers
+ //
---------------------------------------------------------------------------
+
+ private List<String> buildSubsetPhysicalTableNames() {
+ List<String> names = new ArrayList<>();
+ for (String tableName : SUBSET_TABLE_NAMES) {
+ names.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ }
+ return names;
+ }
+
+ private void addLogicalTableConfig(String logicalTableName, List<String>
physicalTableNames)
+ throws IOException {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ for (String physicalTableName : physicalTableNames) {
+ physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+ }
+
+ String refRealtimeTableName = physicalTableNames.stream()
+ .filter(TableNameBuilder::isRealtimeTableResource)
+ .findFirst()
+ .orElse(null);
+
+ LogicalTableConfig logicalTableConfig = new LogicalTableConfigBuilder()
+ .setTableName(logicalTableName)
+ .setBrokerTenant(DEFAULT_TENANT)
+ .setRefRealtimeTableName(refRealtimeTableName)
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .build();
+
+ String addUrl = _controllerRequestURLBuilder.forLogicalTableCreate();
+ String resp = sendPostRequest(addUrl,
logicalTableConfig.toSingleLineJsonString());
+ assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTableName
+ + " logical table successfully added.\"}");
+ }
+
+ //
---------------------------------------------------------------------------
+ // Data push helpers
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Pushes all records from the given avro files to the Kafka topic in strict
round-robin order
+ * across all {@value NUM_KAFKA_PARTITIONS} partitions, starting the message
key sequence at
+ * {@code keyOffset}.
+ *
+ * <p>Each call atomically updates {@link #_partitionRecordCounts} so that
+ * {@link #totalExpectedCount()} always reflects the total number of records
in Kafka.
+ */
+ private void pushAvroRoundRobin(List<File> avroFiles, long keyOffset)
+ throws Exception {
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
getKafkaPort());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+ long keySeq = keyOffset;
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536))
{
+
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> stream =
AvroUtils.getAvroReader(avroFile)) {
+ GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<>(stream.getSchema());
+ for (GenericRecord record : stream) {
+ int partition = (int) (keySeq % NUM_KAFKA_PARTITIONS);
+ outputStream.reset();
+
+ BinaryEncoder encoder = new
EncoderFactory().directBinaryEncoder(outputStream, null);
+ writer.write(record, encoder);
+ encoder.flush();
+
+ producer.send(new ProducerRecord<>(
+ KAFKA_TOPIC, partition, Longs.toByteArray(keySeq),
outputStream.toByteArray())).get();
+
+ _partitionRecordCounts[partition]++;
+ keySeq++;
+ }
+ }
+ }
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Verification helpers
+ //
---------------------------------------------------------------------------
+
+ /** Returns the total number of records pushed across all partitions. */
+ private long totalExpectedCount() {
+ long total = 0;
+ for (long count : _partitionRecordCounts) {
+ total += count;
+ }
+ return total;
+ }
+
+ /**
+ * Returns the expected record count for subset table {@code subsetIndex}
based on its
+ * partition assignment and the number of records pushed to each partition
so far.
+ */
+ private long expectedSubsetCount(int subsetIndex) {
+ long count = 0;
+ for (int partition : SUBSET_PARTITION_ASSIGNMENTS[subsetIndex]) {
+ count += _partitionRecordCounts[partition];
+ }
+ return count;
+ }
+
+ /**
+ * Waits until the logical table and the control table both reflect all
pushed data, and each
+ * subset table reflects exactly the records pushed to its assigned
partitions.
+ */
+ private void waitForCountInvariant(long timeoutMs) {
+ long expectedTotal = totalExpectedCount();
+
+ // Wait for control table
+ TestUtils.waitForCondition(
+ aVoid -> getCurrentCountStarResult(CONTROL_TABLE) == expectedTotal,
+ 1_000L, timeoutMs,
+ "Control table did not reach expected count " + expectedTotal);
+
+ // Wait for logical table
+ TestUtils.waitForCondition(
+ aVoid -> getCurrentCountStarResult(LOGICAL_TABLE) == expectedTotal,
+ 1_000L, timeoutMs,
+ "Logical table did not reach expected count " + expectedTotal);
+
+ // Wait for each individual subset table
+ for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+ final long expectedSubset = expectedSubsetCount(i);
+ final String subsetTable = SUBSET_TABLE_NAMES[i];
+ TestUtils.waitForCondition(
+ aVoid -> getCurrentCountStarResult(subsetTable) == expectedSubset,
+ 1_000L, timeoutMs,
+ "Subset table " + subsetTable + " did not reach expected count " +
expectedSubset);
+ }
+ }
+
+ /**
+ * Hard assertion (no waiting) that all count invariants currently hold.
+ */
+ private void assertCountInvariant() {
+ long expectedTotal = totalExpectedCount();
+ long controlCount = getCurrentCountStarResult(CONTROL_TABLE);
+ long logicalCount = getCurrentCountStarResult(LOGICAL_TABLE);
+
+ assertEquals(controlCount, expectedTotal,
+ "Control table count mismatch. Expected: " + expectedTotal + ",
actual: " + controlCount);
+ assertEquals(logicalCount, expectedTotal,
+ "Logical table count mismatch. Expected: " + expectedTotal + ",
actual: " + logicalCount);
+
+ for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+ long expectedSubset = expectedSubsetCount(i);
+ long actualSubset = getCurrentCountStarResult(SUBSET_TABLE_NAMES[i]);
+ assertEquals(actualSubset, expectedSubset,
+ "Subset table " + SUBSET_TABLE_NAMES[i] + " count mismatch."
+ + " Expected: " + expectedSubset + ", actual: " + actualSubset);
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Force-commit helpers
+ //
---------------------------------------------------------------------------
+
+ private String forceCommit(String realtimeTableName)
+ throws Exception {
+ String response =
sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(realtimeTableName),
null);
+ return
JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+ }
+
+ private void waitForForceCommitCompletion(String realtimeTableName, String
jobId,
+ Set<String> consumingSegmentsBefore, long timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ if (!isForceCommitJobCompleted(jobId)) {
+ return false;
+ }
+ // Verify that the previously-consuming segments are now DONE
+ for (String seg : consumingSegmentsBefore) {
+ var meta =
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, seg);
+ assertNotNull(meta);
+ assertEquals(meta.getStatus(),
CommonConstants.Segment.Realtime.Status.DONE);
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, timeoutMs, "Force commit did not complete for " + realtimeTableName);
+ }
+
+ private boolean isForceCommitJobCompleted(String jobId)
+ throws Exception {
+ String resp =
sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(jobId));
+ JsonNode status = JsonUtils.stringToJsonNode(resp);
+ assertEquals(status.get("jobType").asText(), "FORCE_COMMIT");
+
+ Set<String> pending = new HashSet<>();
+ for (JsonNode elem :
status.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
{
+ pending.add(elem.asText());
+ }
+ return pending.isEmpty();
+ }
+
+ private Set<String> getConsumingSegments(String realtimeTableName) {
+ IdealState idealState =
_helixResourceManager.getTableIdealState(realtimeTableName);
+ assertNotNull(idealState);
+ Set<String> consuming = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
idealState.getRecord().getMapFields().entrySet()) {
+ if
(entry.getValue().containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING))
{
+ consuming.add(entry.getKey());
+ }
+ }
+ return consuming;
+ }
+
+ //
---------------------------------------------------------------------------
+ // Pause / resume helpers
+ //
---------------------------------------------------------------------------
+
+ private void pauseConsumptionAndWait(String realtimeTableName)
+ throws Exception {
+ getControllerRequestClient().pauseConsumption(realtimeTableName);
+ // After a successful pause no segments should remain in CONSUMING state
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ PauseStatusDetails details =
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
+ return details != null && details.getPauseFlag()
+ && (details.getConsumingSegments() == null ||
details.getConsumingSegments().isEmpty());
+ } catch (Exception e) {
+ return false;
+ }
+ }, 500L, 30_000L, "Consumption did not pause for " + realtimeTableName);
+ }
+
+ private void resumeConsumptionAndWait(String realtimeTableName)
+ throws Exception {
+ getControllerRequestClient().resumeConsumption(realtimeTableName);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ PauseStatusDetails details =
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
+ return details != null && !details.getPauseFlag();
+ } catch (Exception e) {
+ return false;
+ }
+ }, 500L, 30_000L, "Consumption did not resume for " + realtimeTableName);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
new file mode 100644
index 00000000000..f09ebdd35e4
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class LogicalTableWithTwoRealtimeTableIntegrationTest extends
BaseLogicalTableIntegrationTest {
+ private static final String KAFKA_TOPIC = "logicalTableWithTwoRealtimeTopic";
+ private static final String LOGICAL_TABLE_NAME = "mytable";
+ private static final String TABLE_NAME_0 = "rt_1";
+ private static final String TABLE_NAME_1 = "rt_2";
+ private static final List<String> REALTIME_TABLE_NAMES =
List.of(TABLE_NAME_0, TABLE_NAME_1);
+ private static final Map<String, Integer> REALTIME_TABLE_PARTITIONS =
+ Map.of(TABLE_NAME_0, 0, TABLE_NAME_1, 1);
+ private static final int NUM_PARTITIONS = 2;
+ private static final int DOCS_LOADED_TIMEOUT_MS = 600_000;
+
+ private long _table0RecordCount;
+ private long _table1RecordCount;
+ private int _realtimeTableConfigIndex;
+ private int _kafkaPushIndex;
+
+ @Override
+ protected String getKafkaTopic() {
+ return KAFKA_TOPIC;
+ }
+
+ @Override
+ protected String getLogicalTableName() {
+ return LOGICAL_TABLE_NAME;
+ }
+
+ @Override
+ protected String getTableName() {
+ return LOGICAL_TABLE_NAME;
+ }
+
+ @Override
+ protected int getNumKafkaPartitions() {
+ return NUM_PARTITIONS;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return super.getCountStarResult();
+ }
+
+ @Override
+ protected List<String> getRealtimeTableNames() {
+ return REALTIME_TABLE_NAMES;
+ }
+
+ @Override
+ protected Map<String, List<File>> getRealtimeTableDataFiles() {
+ Map<String, List<File>> tableNameToFilesMap = new LinkedHashMap<>();
+ for (String tableName : REALTIME_TABLE_NAMES) {
+ tableNameToFilesMap.put(tableName, new ArrayList<>());
+ }
+
+ for (int i = 0; i < _avroFiles.size(); i++) {
+ tableNameToFilesMap.get(REALTIME_TABLE_NAMES.get(i %
REALTIME_TABLE_NAMES.size())).add(_avroFiles.get(i));
+ }
+ return tableNameToFilesMap;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ super.setUp();
+ waitForRecordCounts();
+ }
+
+ @Test
+ public void testFederatedCountStar()
+ throws Exception {
+ assertEquals(_table0RecordCount,
+
getCurrentCountStarResult(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_0)));
+ assertEquals(_table1RecordCount,
+
getCurrentCountStarResult(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_1)));
+ assertEquals(_table0RecordCount + _table1RecordCount,
getCurrentCountStarResult(LOGICAL_TABLE_NAME));
+ }
+
+ @Override
+ @Test
+ public void testQueryTimeOut()
+ throws Exception {
+ String starQuery = "SELECT * from " + getLogicalTableName();
+ QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null,
null);
+ var logicalTableConfig = getLogicalTableConfig(getLogicalTableName());
+ logicalTableConfig.setQueryConfig(queryConfig);
+ updateLogicalTableConfig(logicalTableConfig);
+ JsonNode response = postQuery(starQuery);
+ JsonNode exceptions = response.get("exceptions");
+ if (!exceptions.isEmpty()) {
+ int errorCode = exceptions.get(0).get("errorCode").asInt();
+ assertTrue(errorCode == QueryErrorCode.BROKER_TIMEOUT.getId()
+ || errorCode == QueryErrorCode.SERVER_NOT_RESPONDING.getId()
+ || errorCode == QueryErrorCode.QUERY_SCHEDULING_TIMEOUT.getId(),
+ "Unexpected error code: " + errorCode);
+ }
+
+ // Query succeeds with a high limit.
+ queryConfig = new QueryConfig(1000000L, null, null, null, null, null);
+ logicalTableConfig.setQueryConfig(queryConfig);
+ updateLogicalTableConfig(logicalTableConfig);
+ response = postQuery(starQuery);
+ exceptions = response.get("exceptions");
+ assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+
+ // Reset to null.
+ queryConfig = new QueryConfig(null, null, null, null, null, null);
+ logicalTableConfig.setQueryConfig(queryConfig);
+ updateLogicalTableConfig(logicalTableConfig);
+ response = postQuery(starQuery);
+ exceptions = response.get("exceptions");
+ assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+ }
+
+ @Override
+ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ TableConfig tableConfig = super.createRealtimeTableConfig(sampleAvroFile);
+ int tableIndex = _realtimeTableConfigIndex++;
+ String tableName = REALTIME_TABLE_NAMES.get(tableIndex);
+ Integer partitionId = REALTIME_TABLE_PARTITIONS.get(tableName);
+ tableConfig.setTableName(tableName);
+ Map<String, String> streamConfigs = new
HashMap<>(tableConfig.getIndexingConfig().getStreamConfigs());
+ streamConfigs.put("stream.kafka.partition.ids",
String.valueOf(partitionId));
+
+ tableConfig.getIndexingConfig().setStreamConfigs(null);
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(List.of(streamConfigs)));
+ tableConfig.setIngestionConfig(ingestionConfig);
+ return tableConfig;
+ }
+
+ @Override
+ protected void pushAvroIntoKafka(List<File> avroFiles)
+ throws Exception {
+ Properties producerProperties = new Properties();
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:" + getKafkaPort());
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+ String tableName = REALTIME_TABLE_NAMES.get(_kafkaPushIndex);
+ int partition = REALTIME_TABLE_PARTITIONS.get(tableName);
+ _kafkaPushIndex++;
+
+ long recordCount = 0;
+ long keySequence = 0;
+ byte[] kafkaMessageHeader = getKafkaMessageHeader();
+
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProperties);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536))
{
+
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> dataFileStream =
AvroUtils.getAvroReader(avroFile)) {
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(dataFileStream.getSchema());
+ for (GenericRecord genericRecord : dataFileStream) {
+ outputStream.reset();
+ if (kafkaMessageHeader != null && kafkaMessageHeader.length > 0) {
+ outputStream.write(kafkaMessageHeader);
+ }
+
+ BinaryEncoder binaryEncoder = new
EncoderFactory().directBinaryEncoder(outputStream, null);
+ datumWriter.write(genericRecord, binaryEncoder);
+ binaryEncoder.flush();
+
+ producer.send(new ProducerRecord<>(
+ getKafkaTopic(), partition, Longs.toByteArray(keySequence++),
outputStream.toByteArray()))
+ .get();
+ recordCount++;
+ }
+ }
+ }
+ }
+
+ if (partition == 0) {
+ _table0RecordCount = recordCount;
+ } else if (partition == 1) {
+ _table1RecordCount = recordCount;
+ } else {
+ throw new IllegalStateException("Unexpected partition: " + partition);
+ }
+ }
+
+ private void waitForRecordCounts() {
+ String realtimeTableName0 =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_0);
+ String realtimeTableName1 =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_1);
+ TestUtils.waitForCondition(ignored -> {
+ _table0RecordCount = getCurrentCountStarResult(realtimeTableName0);
+ _table1RecordCount = getCurrentCountStarResult(realtimeTableName1);
+ return getCurrentCountStarResult(LOGICAL_TABLE_NAME) ==
_table0RecordCount + _table1RecordCount;
+ }, 100L, DOCS_LOADED_TIMEOUT_MS,
+ "Failed to load the expected record counts for realtime logical
tables");
+
+ Assert.assertEquals(getCurrentCountStarResult(realtimeTableName0),
_table0RecordCount);
+ Assert.assertEquals(getCurrentCountStarResult(realtimeTableName1),
_table1RecordCount);
+ Assert.assertEquals(getCurrentCountStarResult(LOGICAL_TABLE_NAME),
_table0RecordCount + _table1RecordCount);
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 4abc9f21919..18dcd125eab 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -26,6 +26,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,11 +41,16 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionSubsetUtils;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -58,6 +64,11 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
implements StreamMetadataProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+ /**
+ * Immutable partition ID subset from table config. Read once at
construction; does not change during the
+ * provider's lifetime. Empty when no subset is configured (consume all
partitions).
+ */
+ private final List<Integer> _partitionIdSubset;
public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
this(clientId, streamConfig, Integer.MIN_VALUE);
@@ -65,6 +76,14 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig, int partition) {
super(clientId, streamConfig, partition);
+ List<Integer> subset =
+
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+ if (subset != null) {
+ _partitionIdSubset = Collections.unmodifiableList(subset);
+ validatePartitionIds(_partitionIdSubset);
+ } else {
+ _partitionIdSubset = List.of();
+ }
}
@Override
@@ -75,7 +94,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
return partitionInfos.size();
}
throw new TransientConsumerException(new RuntimeException(
- String.format("Failed to fetch partition information for topic: %s",
_topic)));
+ "Failed to fetch partition information for topic: " + _topic));
} catch (TimeoutException e) {
throw new TransientConsumerException(e);
}
@@ -87,7 +106,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
List<PartitionInfo> partitionInfos = fetchPartitionInfos(timeoutMillis);
if (CollectionUtils.isEmpty(partitionInfos)) {
throw new TransientConsumerException(new RuntimeException(
- String.format("Failed to fetch partition information for topic:
%s", _topic)));
+ "Failed to fetch partition information for topic: " + _topic));
}
Set<Integer> partitionIds =
Sets.newHashSetWithExpectedSize(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
@@ -99,6 +118,39 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ if (_partitionIdSubset.isEmpty()) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToEndOffset =
+ new HashMap<>(partitionGroupConsumptionStatuses.size());
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ partitionIdToEndOffset.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new
ArrayList<>(_partitionIdSubset.size());
+ for (Integer partitionId : _partitionIdSubset) {
+ StreamPartitionMsgOffset endOffset =
partitionIdToEndOffset.get(partitionId);
+ StreamPartitionMsgOffset startOffset;
+ if (endOffset == null) {
+ try (StreamMetadataProvider partitionMetadataProvider =
+ streamConsumerFactory.createPartitionMetadataProvider(
+ StreamConsumerFactory.getUniqueClientId(clientId),
partitionId)) {
+ startOffset = partitionMetadataProvider.fetchStreamPartitionOffset(
+ streamConfig.getOffsetCriteria(), timeoutMillis);
+ }
+ } else {
+ startOffset = endOffset;
+ }
+ result.add(new PartitionGroupMetadata(partitionId, startOffset));
+ }
+ return result;
+ }
+
@Override
public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
List<TopicPartition> topicPartitions = new
ArrayList<>(partitionIds.size());
@@ -309,7 +361,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
try {
if (!topicExists(requestTimeoutMs)) {
topicMissing = true;
- lastError = new RuntimeException(String.format("Topic does not
exist: %s", _topic));
+ lastError = new RuntimeException("Topic does not exist: " + _topic);
} else {
topicMissing = false;
}
@@ -332,7 +384,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
if (lastError != null) {
if (topicMissing) {
- throw new RuntimeException(String.format("Topic does not exist: %s",
_topic));
+ throw new RuntimeException("Topic does not exist: " + _topic);
}
if (lastError instanceof TransientConsumerException) {
throw (TransientConsumerException) lastError;
@@ -343,7 +395,30 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
throw new TransientConsumerException(
- new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
+ new RuntimeException("Failed to fetch partition information for topic:
" + _topic));
+ }
+
+ private void validatePartitionIds(List<Integer> subset) {
+ Set<Integer> topicPartitionIds = new HashSet<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ if (partitionInfos == null || partitionInfos.isEmpty()) {
+ throw new IllegalStateException(
+ "Cannot validate partition IDs: topic " + _topic + " metadata not
available. "
+ + "Ensure the topic exists and is accessible.");
+ }
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ List<Integer> missingPartitionIds = new ArrayList<>();
+ for (Integer partitionId : subset) {
+ if (!topicPartitionIds.contains(partitionId)) {
+ missingPartitionIds.add(partitionId);
+ }
+ }
+ Preconditions.checkArgument(
+ missingPartitionIds.isEmpty(),
+ "Invalid partition ids %s for table stream config. Available
partitions on topic %s are: %s",
+ missingPartitionIds, _topic, topicPartitionIds);
}
private boolean topicExists(long timeoutMillis) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index 3f554b91f07..38423b0de4a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -79,7 +79,9 @@ public class KafkaPartitionLevelConsumerTest {
private static final String TEST_TOPIC_1 = "foo";
private static final String TEST_TOPIC_2 = "bar";
private static final String TEST_TOPIC_3 = "expired";
+ private static final String TEST_TOPIC_SUBSET_PARTITION = "subsetPartition";
private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+ private static final int NUM_MSG_PRODUCED_PER_SUBSET_PARTITION = 100;
private static final long TIMESTAMP = Instant.now().toEpochMilli();
private static final Random RANDOM = new Random();
@@ -99,6 +101,7 @@ public class KafkaPartitionLevelConsumerTest {
_kafkaCluster.createTopic(TEST_TOPIC_1, 1);
_kafkaCluster.createTopic(TEST_TOPIC_2, 2);
_kafkaCluster.createTopic(TEST_TOPIC_3, 1);
+ _kafkaCluster.createTopic(TEST_TOPIC_SUBSET_PARTITION, 8);
Thread.sleep(STABILIZE_SLEEP_DELAYS);
produceMsgToKafka();
Thread.sleep(STABILIZE_SLEEP_DELAYS);
@@ -119,6 +122,13 @@ public class KafkaPartitionLevelConsumerTest {
producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i,
null, "sample_msg_" + i));
producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
}
+ for (int partitionId = 0; partitionId < 8; partitionId++) {
+ for (int msgIdx = 0; msgIdx < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION;
msgIdx++) {
+ String payload = "mod_" + partitionId + "_msg_" + msgIdx;
+ producer.send(new ProducerRecord<>(TEST_TOPIC_SUBSET_PARTITION,
partitionId,
+ TIMESTAMP + (partitionId * 1000L) + msgIdx,
Integer.toString(partitionId), payload));
+ }
+ }
producer.flush();
}
}
@@ -130,6 +140,7 @@ public class KafkaPartitionLevelConsumerTest {
_kafkaCluster.deleteTopic(TEST_TOPIC_1);
_kafkaCluster.deleteTopic(TEST_TOPIC_2);
_kafkaCluster.deleteTopic(TEST_TOPIC_3);
+ _kafkaCluster.deleteTopic(TEST_TOPIC_SUBSET_PARTITION);
} finally {
_kafkaCluster.stop();
}
@@ -206,6 +217,71 @@ public class KafkaPartitionLevelConsumerTest {
assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSubsetPartitionInvalidPartition() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "invalid-subset-client";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+ KafkaStreamConfigProperties.PARTITION_IDS), "99");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ new KafkaStreamMetadataProvider(clientId, streamConfig);
+ }
+
+ @Test
+ public void testSubsetPartitionConsumption()
+ throws TimeoutException {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String tableNameWithType = "tableName_REALTIME";
+
+ for (int partitionId = 0; partitionId < 8; partitionId++) {
+ String clientId = "subset-client-" + partitionId;
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name",
TEST_TOPIC_SUBSET_PARTITION);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+ KafkaStreamConfigProperties.PARTITION_IDS),
Integer.toString(partitionId));
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider(clientId, streamConfig);
+ // With subset partition config, fetchPartitionCount/fetchPartitionIds
still return the total Kafka
+ // partition count and all partition IDs; subset filtering only applies
to segment creation.
+ assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 8);
+ assertEquals(streamMetadataProvider.fetchPartitionIds(10000L),
+ new HashSet<>(List.of(0, 1, 2, 3, 4, 5, 6, 7)));
+
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ PartitionGroupConsumer consumer =
streamConsumerFactory.createPartitionGroupConsumer(
+ clientId,
+ new PartitionGroupConsumptionStatus(partitionId, 0, new
LongMsgOffset(0),
+ new LongMsgOffset(NUM_MSG_PRODUCED_PER_SUBSET_PARTITION),
"CONSUMING"));
+
+ MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0),
10000);
+ assertEquals(messageBatch.getMessageCount(),
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+ assertEquals(messageBatch.getUnfilteredMessageCount(),
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+ for (int i = 0; i < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()), "mod_" +
partitionId + "_msg_" + i);
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(((LongMsgOffset) metadata.getOffset()).getOffset(), i);
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP +
(partitionId * 1000L) + i);
+ }
+ }
+ }
+
@Test
public void testFetchMessages() {
String streamType = "kafka";
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
index 9eb73deda63..bd712ac6fb6 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -26,6 +26,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,11 +41,16 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionSubsetUtils;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -58,6 +64,11 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
implements StreamMetadataProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+ /**
+ * Immutable partition ID subset from table config. Read once at
construction; does not change during the
+ * provider's lifetime. Empty when no subset is configured (consume all
partitions).
+ */
+ private final List<Integer> _partitionIdSubset;
public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
this(clientId, streamConfig, Integer.MIN_VALUE);
@@ -65,6 +76,14 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
public KafkaStreamMetadataProvider(String clientId, StreamConfig
streamConfig, int partition) {
super(clientId, streamConfig, partition);
+ List<Integer> subset =
+
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+ if (subset != null) {
+ _partitionIdSubset = Collections.unmodifiableList(subset);
+ validatePartitionIds(_partitionIdSubset);
+ } else {
+ _partitionIdSubset = List.of();
+ }
}
@Override
@@ -75,7 +94,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
return partitionInfos.size();
}
throw new TransientConsumerException(new RuntimeException(
- String.format("Failed to fetch partition information for topic: %s",
_topic)));
+ "Failed to fetch partition information for topic: " + _topic));
} catch (TimeoutException e) {
throw new TransientConsumerException(e);
}
@@ -87,7 +106,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
List<PartitionInfo> partitionInfos = fetchPartitionInfos(timeoutMillis);
if (CollectionUtils.isEmpty(partitionInfos)) {
throw new TransientConsumerException(new RuntimeException(
- String.format("Failed to fetch partition information for topic:
%s", _topic)));
+ "Failed to fetch partition information for topic: " + _topic));
}
Set<Integer> partitionIds =
Sets.newHashSetWithExpectedSize(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
@@ -99,6 +118,39 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ if (_partitionIdSubset.isEmpty()) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToEndOffset =
+ new HashMap<>(partitionGroupConsumptionStatuses.size());
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ partitionIdToEndOffset.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new
ArrayList<>(_partitionIdSubset.size());
+ for (Integer partitionId : _partitionIdSubset) {
+ StreamPartitionMsgOffset endOffset =
partitionIdToEndOffset.get(partitionId);
+ StreamPartitionMsgOffset startOffset;
+ if (endOffset == null) {
+ try (StreamMetadataProvider partitionMetadataProvider =
+ streamConsumerFactory.createPartitionMetadataProvider(
+ StreamConsumerFactory.getUniqueClientId(clientId),
partitionId)) {
+ startOffset = partitionMetadataProvider.fetchStreamPartitionOffset(
+ streamConfig.getOffsetCriteria(), timeoutMillis);
+ }
+ } else {
+ startOffset = endOffset;
+ }
+ result.add(new PartitionGroupMetadata(partitionId, startOffset));
+ }
+ return result;
+ }
+
@Override
public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
List<TopicPartition> topicPartitions = new
ArrayList<>(partitionIds.size());
@@ -307,7 +359,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
try {
if (!topicExists(requestTimeoutMs)) {
topicMissing = true;
- lastError = new RuntimeException(String.format("Topic does not
exist: %s", _topic));
+ lastError = new RuntimeException("Topic does not exist: " + _topic);
} else {
topicMissing = false;
}
@@ -330,7 +382,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
if (lastError != null) {
if (topicMissing) {
- throw new RuntimeException(String.format("Topic does not exist: %s",
_topic));
+ throw new RuntimeException("Topic does not exist: " + _topic);
}
if (lastError instanceof TransientConsumerException) {
throw (TransientConsumerException) lastError;
@@ -341,7 +393,30 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
throw new TransientConsumerException(
- new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
+ new RuntimeException("Failed to fetch partition information for topic:
" + _topic));
+ }
+
+ private void validatePartitionIds(List<Integer> subset) {
+ Set<Integer> topicPartitionIds = new HashSet<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ if (partitionInfos == null || partitionInfos.isEmpty()) {
+ throw new IllegalStateException(
+ "Cannot validate partition IDs: topic " + _topic + " metadata not
available. "
+ + "Ensure the topic exists and is accessible.");
+ }
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ List<Integer> missingPartitionIds = new ArrayList<>();
+ for (Integer partitionId : subset) {
+ if (!topicPartitionIds.contains(partitionId)) {
+ missingPartitionIds.add(partitionId);
+ }
+ }
+ Preconditions.checkArgument(
+ missingPartitionIds.isEmpty(),
+ "Invalid partition ids %s for table stream config. Available
partitions on topic %s are: %s",
+ missingPartitionIds, _topic, topicPartitionIds);
}
private boolean topicExists(long timeoutMillis) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
index f59c7a635a7..34852964e6e 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
@@ -58,11 +58,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.DockerClientFactory;
import org.testng.Assert;
-import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -77,15 +73,15 @@ import static org.testng.Assert.assertTrue;
/**
* Tests for the KafkaPartitionLevelConsumer.
- * Note: These tests require Docker to be running as they use Testcontainers.
*/
public class KafkaPartitionLevelConsumerTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
private static final long STABILIZE_SLEEP_DELAYS = 3000;
private static final String TEST_TOPIC_1 = "foo";
private static final String TEST_TOPIC_2 = "bar";
private static final String TEST_TOPIC_3 = "expired";
+ private static final String TEST_TOPIC_SUBSET_PARTITION = "subsetPartition";
private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+ private static final int NUM_MSG_PRODUCED_PER_SUBSET_PARTITION = 100;
private static final long TIMESTAMP = Instant.now().toEpochMilli();
private static final Random RANDOM = new Random();
@@ -95,37 +91,19 @@ public class KafkaPartitionLevelConsumerTest {
@BeforeClass
public void setUp()
throws Exception {
- // Check if Docker is available, skip tests if not
- if (!isDockerAvailable()) {
- throw new SkipException("Docker is not available. Skipping Kafka 4.0
consumer tests. "
- + "These tests require Docker for Testcontainers.");
- }
_kafkaCluster = new MiniKafkaCluster("0");
_kafkaCluster.start();
_kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
_kafkaCluster.createTopic(TEST_TOPIC_1, 1);
_kafkaCluster.createTopic(TEST_TOPIC_2, 2);
_kafkaCluster.createTopic(TEST_TOPIC_3, 1);
+ _kafkaCluster.createTopic(TEST_TOPIC_SUBSET_PARTITION, 8);
Thread.sleep(STABILIZE_SLEEP_DELAYS);
produceMsgToKafka();
Thread.sleep(STABILIZE_SLEEP_DELAYS);
_kafkaCluster.deleteRecordsBeforeOffset(TEST_TOPIC_3, 0, 200);
}
- /**
- * Checks if Docker is available for running Testcontainers.
- * @return true if Docker is available, false otherwise
- */
- private static boolean isDockerAvailable() {
- try {
- DockerClientFactory.instance().client();
- return true;
- } catch (Throwable ex) {
- LOGGER.warn("Docker is not available: {}", ex.getMessage());
- return false;
- }
- }
-
private void produceMsgToKafka() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
@@ -140,6 +118,13 @@ public class KafkaPartitionLevelConsumerTest {
producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i,
null, "sample_msg_" + i));
producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
}
+ for (int partitionId = 0; partitionId < 8; partitionId++) {
+ for (int msgIdx = 0; msgIdx < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION;
msgIdx++) {
+ String payload = "mod_" + partitionId + "_msg_" + msgIdx;
+ producer.send(new ProducerRecord<>(TEST_TOPIC_SUBSET_PARTITION,
partitionId,
+ TIMESTAMP + (partitionId * 1000L) + msgIdx,
Integer.toString(partitionId), payload));
+ }
+ }
producer.flush();
}
}
@@ -151,6 +136,7 @@ public class KafkaPartitionLevelConsumerTest {
_kafkaCluster.deleteTopic(TEST_TOPIC_1);
_kafkaCluster.deleteTopic(TEST_TOPIC_2);
_kafkaCluster.deleteTopic(TEST_TOPIC_3);
+ _kafkaCluster.deleteTopic(TEST_TOPIC_SUBSET_PARTITION);
} finally {
_kafkaCluster.stop();
}
@@ -225,6 +211,79 @@ public class KafkaPartitionLevelConsumerTest {
streamMetadataProvider = new KafkaStreamMetadataProvider(clientId,
streamConfig);
assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+
+ streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+ KafkaStreamConfigProperties.PARTITION_IDS), "1");
+ streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+ streamMetadataProvider = new KafkaStreamMetadataProvider(clientId,
streamConfig);
+ // With subset partition config, fetchPartitionCount/fetchPartitionIds
still return the total Kafka
+ // partition count and all partition IDs; subset filtering only applies to
segment creation.
+ assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+ assertEquals(streamMetadataProvider.fetchPartitionIds(10000L), new
HashSet<>(List.of(0, 1)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSubsetPartitionInvalidPartition() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String clientId = "invalid-subset-client";
+ String tableNameWithType = "tableName_REALTIME";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+ KafkaStreamConfigProperties.PARTITION_IDS), "99");
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ new KafkaStreamMetadataProvider(clientId, streamConfig);
+ }
+
+ @Test
+ public void testSubsetPartitionConsumption() throws TimeoutException {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
+ String tableNameWithType = "tableName_REALTIME";
+
+ for (int partitionId = 0; partitionId < 8; partitionId++) {
+ String clientId = "subset-client-" + partitionId;
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name",
TEST_TOPIC_SUBSET_PARTITION);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+ KafkaStreamConfigProperties.PARTITION_IDS),
Integer.toString(partitionId));
+ StreamConfig streamConfig = new StreamConfig(tableNameWithType,
streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider(clientId, streamConfig);
+ // With subset partition config, fetchPartitionCount/fetchPartitionIds
still return the total Kafka
+ // partition count and all partition IDs; subset filtering only applies
to segment creation.
+ assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 8);
+ assertEquals(streamMetadataProvider.fetchPartitionIds(10000L),
+ new HashSet<>(List.of(0, 1, 2, 3, 4, 5, 6, 7)));
+
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ PartitionGroupConsumer consumer =
streamConsumerFactory.createPartitionGroupConsumer(
+ clientId,
+ new PartitionGroupConsumptionStatus(partitionId, 0, new
LongMsgOffset(0),
+ new LongMsgOffset(NUM_MSG_PRODUCED_PER_SUBSET_PARTITION),
"CONSUMING"));
+
+ MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0),
10000);
+ assertEquals(messageBatch.getMessageCount(),
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+ assertEquals(messageBatch.getUnfilteredMessageCount(),
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+ for (int i = 0; i < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; i++) {
+ StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+ assertEquals(new String((byte[]) streamMessage.getValue()), "mod_" +
partitionId + "_msg_" + i);
+ StreamMessageMetadata metadata = streamMessage.getMetadata();
+ assertEquals(((LongMsgOffset) metadata.getOffset()).getOffset(), i);
+ assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP +
(partitionId * 1000L) + i);
+ }
+ }
}
@Test
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
index 7202e10c472..18e171b974a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.stream.kafka;
import com.google.common.base.Preconditions;
+import java.util.Collections;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -120,6 +121,14 @@ public class KafkaPartitionLevelStreamConfig {
return _populateMetadata;
}
+ /**
+ * Returns an unmodifiable view of the stream config map so callers cannot
mutate it
+ * and change consumer behavior after construction.
+ */
+ public Map<String, String> getStreamConfigMap() {
+ return Collections.unmodifiableMap(_streamConfigMap);
+ }
+
private int getIntConfigWithDefault(Map<String, String> configMap, String
key, int defaultValue) {
String stringValue = configMap.get(key);
try {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
new file mode 100644
index 00000000000..e9fc7ad69e1
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Utilities for parsing and validating Kafka partition subset configuration
+ * (stream.kafka.partition.ids) from stream config.
+ */
+public final class KafkaPartitionSubsetUtils {
+
+ private KafkaPartitionSubsetUtils() {
+ }
+
+ /**
+ * Reads the optional comma-separated partition ID list from the stream
config map.
+ * Returns a sorted, deduplicated list for stable ordering when used for
partition group metadata.
+ * Duplicate IDs in the config are silently removed; this ensures stable
ordering and prevents
+ * duplicate processing of the same partition.
+ *
+ * @param streamConfigMap table stream config map (e.g. from
+ * {@link
org.apache.pinot.spi.stream.StreamConfig#getStreamConfigsMap()})
+ * @return Sorted list of unique partition IDs when
stream.kafka.partition.ids is set and non-empty;
+ * null when not set or blank
+ * @throws IllegalArgumentException if the value contains invalid
(non-integer) entries
+ */
+ @Nullable
+ public static List<Integer> getPartitionIdsFromConfig(Map<String, String>
streamConfigMap) {
+ String key =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS);
+ String value = streamConfigMap.get(key);
+ if (StringUtils.isBlank(value)) {
+ return null;
+ }
+ String[] parts = value.split(",");
+ Set<Integer> idSet = new HashSet<>();
+ for (String part : parts) {
+ String trimmed = part.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ try {
+ int partitionId = Integer.parseInt(trimmed);
+ if (partitionId < 0) {
+ throw new IllegalArgumentException("Invalid " + key
+ + " value: partition IDs must be non-negative, got '" + value +
"'");
+ }
+ idSet.add(partitionId);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid " + key + " value: expected comma-separated integers, got
'" + value + "'", e);
+ }
+ }
+ if (idSet.isEmpty()) {
+ return null;
+ }
+ List<Integer> ids = new ArrayList<>(idSet);
+ Collections.sort(ids);
+ return ids;
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
index 477bbf56418..1d7fa19de84 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
@@ -68,4 +68,10 @@ public class KafkaStreamConfigProperties {
}
public static final String KAFKA_CONSUMER_PROP_PREFIX =
"kafka.consumer.prop";
+
+ /**
+ * Optional comma-separated list of Kafka partition IDs to consume (e.g.
"0,2,5").
+ * When set, only these partitions are used for the table; when absent, all
topic partitions are consumed.
+ */
+ public static final String PARTITION_IDS = "kafka.partition.ids";
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
new file mode 100644
index 00000000000..9b262fab056
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionSubsetUtilsTest {
+
+ @Test
+ public void testGetPartitionIdsFromConfigMissingKey() {
+ Map<String, String> config = new HashMap<>();
+ config.put("stream.kafka.topic.name", "myTopic");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigBlankValue() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
" ");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigEmptyAfterTrim() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
",");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigValidSubset() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,2,5");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 2, 5));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigUnsortedReturnsSorted() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "5, 2 , 0");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 2, 5));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetPartitionIdsFromConfigInvalidNumber() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,abc,1");
+ KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetPartitionIdsFromConfigInvalidNumberOnly() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "not_a_number");
+ KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetPartitionIdsFromConfigNegativePartitionId() {
+ Map<String, String> config = new HashMap<>();
+ config.put(
+
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "-1,0,1");
+ KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigSinglePartition() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
"3");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(3));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigEmptyMap() {
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(Collections.emptyMap());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigDedupesDuplicates() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "2,0,2,5,0,5");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 2, 5));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigMultipleCommasWithWhitespace() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ " , , 0 , , ");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigLeadingAndTrailingCommas() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ ",,,0,1,2,,,");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 1, 2));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigAllEmptyCommas() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ " , , , , ");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigVeryLargePartitionId() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "999,1000,9999");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(999, 1000, 9999));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigAllPartitionsInSubset() {
+ // Simulates a subset config that contains all partitions (e.g., 0,1,2,3
for a 4-partition topic)
+ // This is valid config-wise but might not be useful in practice
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,1,2,3");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 1, 2, 3));
+ }
+
+ @Test
+ public void testGetPartitionIdsFromConfigMixedWhitespace() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ " 5 , 2 ,\t0\t,\n3\n");
+ List<Integer> result =
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, List.of(0, 2, 3, 5));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetPartitionIdsFromConfigPartiallyInvalid() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,1,2,invalid,3");
+ KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetPartitionIdsFromConfigNegativeInMiddle() {
+ Map<String, String> config = new HashMap<>();
+
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+ "0,1,-5,2");
+ KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index cbd8cd98428..15197294bd4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -25,6 +25,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
@@ -35,12 +37,23 @@ import java.util.Properties;
import java.util.zip.GZIPInputStream;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.entity.mime.FileBody;
+import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpEntity;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.admin.command.AbstractBaseAdminCommand;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.AirlineDataStream;
import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -81,17 +94,19 @@ public abstract class QuickStartBase {
"examples/batch/testUnnest",
};
- protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= Map.of(
- "airlineStats", "examples/stream/airlineStats",
- "dailySales", "examples/stream/dailySales",
- "githubEvents", "examples/stream/githubEvents",
- "meetupRsvp", "examples/stream/meetupRsvp",
- "meetupRsvpJson", "examples/stream/meetupRsvpJson",
- "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType",
- "upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp",
- "upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp",
- "upsertPartialMeetupRsvp", "examples/stream/upsertPartialMeetupRsvp",
- "fineFoodReviews", "examples/stream/fineFoodReviews");
+ protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES
= Map.ofEntries(
+ Map.entry("airlineStats", "examples/stream/airlineStats"),
+ Map.entry("dailySales", "examples/stream/dailySales"),
+ Map.entry("githubEvents", "examples/stream/githubEvents"),
+ Map.entry("meetupRsvp", "examples/stream/meetupRsvp"),
+ Map.entry("meetupRsvpJson", "examples/stream/meetupRsvpJson"),
+ Map.entry("meetupRsvpComplexType",
"examples/stream/meetupRsvpComplexType"),
+ Map.entry("upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp"),
+ Map.entry("upsertJsonMeetupRsvp",
"examples/stream/upsertJsonMeetupRsvp"),
+ Map.entry("upsertPartialMeetupRsvp",
"examples/stream/upsertPartialMeetupRsvp"),
+ Map.entry("fineFoodReviews", "examples/stream/fineFoodReviews"),
+ Map.entry("fineFoodReviews_part_0",
"examples/stream/fineFoodReviews_part_0"),
+ Map.entry("fineFoodReviews_part_1",
"examples/stream/fineFoodReviews_part_1"));
protected File _dataDir = FileUtils.getTempDirectory();
protected boolean _setCustomDataDir;
@@ -566,6 +581,10 @@ public abstract class QuickStartBase {
"***** Starting fineFoodReviews data stream and publishing to
Kafka *****");
publishStreamDataToKafka("fineFoodReviews", new
File(quickstartTmpDir, "fineFoodReviews"));
break;
+ case "fineFoodReviews_part_0":
+ case "fineFoodReviews_part_1":
+ // Consume from existing fineFoodReviews topic (partition subset);
no separate stream to start
+ break;
default:
throw new UnsupportedOperationException("Unknown stream name: " +
streamName);
}
@@ -1023,4 +1042,67 @@ public abstract class QuickStartBase {
}
return null;
}
+
+ protected void createFineFoodReviewsFederatedTable() {
+ if (!useDefaultBootstrapTableDir()) {
+ return;
+ }
+ Map<String, String> streamTableDirectories =
getDefaultStreamTableDirectories();
+ if (!streamTableDirectories.containsKey("fineFoodReviews_part_0")
+ || !streamTableDirectories.containsKey("fineFoodReviews_part_1")) {
+ return;
+ }
+ String logicalTableName = "fineFoodReviews-federated";
+ try {
+ Schema schema =
loadSchemaFromResource("/examples/stream/fineFoodReviews/fineFoodReviews_schema.json");
+ schema.setSchemaName(logicalTableName);
+ createSchemaOnController(schema, logicalTableName);
+
+ LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+ logicalTableConfig.setTableName(logicalTableName);
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+
logicalTableConfig.setRefRealtimeTableName("fineFoodReviews_part_0_REALTIME");
+ logicalTableConfig.setPhysicalTableConfigMap(Map.of(
+ "fineFoodReviews_part_0_REALTIME", new PhysicalTableConfig(),
+ "fineFoodReviews_part_1_REALTIME", new PhysicalTableConfig()
+ ));
+
+ String logicalTableUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/logicalTables";
+ AbstractBaseAdminCommand.sendPostRequest(logicalTableUrl,
logicalTableConfig.toSingleLineJsonString());
+ printStatus(Quickstart.Color.GREEN,
+ "***** Logical table fineFoodReviews-federated created successfully
*****");
+ } catch (Exception e) {
+ printStatus(Quickstart.Color.YELLOW,
+ "***** Logical table fineFoodReviews-federated creation failed: " +
e.getMessage() + " *****");
+ }
+ }
+
+ private Schema loadSchemaFromResource(String resourcePath)
+ throws IOException {
+ try (InputStream inputStream =
getClass().getResourceAsStream(resourcePath)) {
+ if (inputStream == null) {
+ throw new IOException("Schema file not found: " + resourcePath);
+ }
+ String schemaJsonString = IOUtils.toString(inputStream,
StandardCharsets.UTF_8);
+ return Schema.fromString(schemaJsonString);
+ }
+ }
+
+ private void createSchemaOnController(Schema schema, String logicalTableName)
+ throws Exception {
+ File tempSchemaFile = File.createTempFile(logicalTableName + "_schema",
".json");
+ tempSchemaFile.deleteOnExit();
+ FileUtils.writeStringToFile(tempSchemaFile,
schema.toSingleLineJsonString(), StandardCharsets.UTF_8);
+ HttpEntity multipartEntity = MultipartEntityBuilder.create()
+ .addPart("schema", new FileBody(tempSchemaFile,
ContentType.APPLICATION_JSON, logicalTableName + ".json"))
+ .build();
+ try (HttpClient httpClient = new HttpClient()) {
+ String schemaUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/schemas?override=true"
+ + "&force=true";
+ SimpleHttpResponse response =
httpClient.sendPostRequest(URI.create(schemaUrl), multipartEntity, null, null);
+ if (response.getStatusCode() != 200) {
+ throw new RuntimeException("Schema creation response: " +
response.getResponse());
+ }
+ }
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 2382353462d..21e69d28380 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -88,6 +88,30 @@ public class RealtimeQuickStart extends QuickStartBase {
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
printStatus(Color.GREEN,
"***************************************************");
+ String q6 = "select count(*) from fineFoodReviews";
+ printStatus(Color.YELLOW, "Total number of documents in fineFoodReviews");
+ printStatus(Color.CYAN, "Query : " + q6);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q6)));
+ printStatus(Color.GREEN,
"***************************************************");
+
+ String q7 = "select count(*) from \"fineFoodReviews-federated\"";
+ printStatus(Color.YELLOW, "Total number of documents in
fineFoodReviews-federated");
+ printStatus(Color.CYAN, "Query : " + q7);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q7)));
+ printStatus(Color.GREEN,
"***************************************************");
+
+ String q8 = "select count(*) from \"fineFoodReviews_part_0\"";
+ printStatus(Color.YELLOW, "Total number of documents in
fineFoodReviews_part_0");
+ printStatus(Color.CYAN, "Query : " + q8);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q8)));
+ printStatus(Color.GREEN,
"***************************************************");
+
+ String q9 = "select count(*) from \"fineFoodReviews_part_1\"";
+ printStatus(Color.YELLOW, "Total number of documents in
fineFoodReviews_part_1");
+ printStatus(Color.CYAN, "Query : " + q9);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q9)));
+ printStatus(Color.GREEN,
"***************************************************");
+
runVectorQueryExamples(runner);
}
@@ -118,6 +142,7 @@ public class RealtimeQuickStart extends QuickStartBase {
printStatus(Color.CYAN, "***** Bootstrap all tables *****");
runner.bootstrapTable();
+ createFineFoodReviewsFederatedTable();
printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to
get populated *****");
Thread.sleep(5000);
diff --git
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
new file mode 100644
index 00000000000..ba9fa20eae6
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
@@ -0,0 +1,79 @@
+{
+ "tableName": "fineFoodReviews_part_0",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "segmentPushType": "APPEND",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "timeColumnName": "ts",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "5",
+ "replication": "1"
+ },
+ "tenants": {
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "noDictionaryColumns": ["Text"],
+ "multiColumnTextIndexConfig": {
+ "columns": ["UserId", "ProductId", "Summary"]
+ }
+ },
+ "routing": {
+ "segmentPrunerTypes": [
+ "time"
+ ]
+ },
+ "ingestionConfig": {
+ "streamIngestionConfig": {
+ "streamConfigMaps": [
+ {
+ "streamType": "kafka",
+ "stream.kafka.topic.name": "fineFoodReviews",
+ "stream.kafka.partition.ids": "0",
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
+ "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+ "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+ "stream.kafka.broker.list": "localhost:19092",
+ "realtime.segment.flush.threshold.time": "3600000",
+ "realtime.segment.flush.threshold.size": "50000"
+ }
+ ]
+ },
+ "transformConfigs": [
+ {
+ "columnName": "ts",
+ "transformFunction": "now()"
+ }
+ ]
+ },
+ "metadata": {
+ "customConfigs": {
+ }
+ },
+ "fieldConfigList": [
+ {
+ "encodingType": "RAW",
+ "indexType": "VECTOR",
+ "name": "embedding",
+ "properties": {
+ "vectorIndexType": "HNSW",
+ "vectorDimension": 1536,
+ "vectorDistanceFunction": "COSINE",
+ "version": 1,
+ "commitDocs": "1"
+ }
+ },
+ {
+ "name": "Text",
+ "encodingType": "RAW",
+ "indexes": {
+ "text": {
+ "deriveNumDocsPerChunkForRawIndex": "true",
+ "rawIndexWriterVersion": "3",
+ "caseSensitive": "false"
+ }
+ }
+ }
+ ]
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
new file mode 100644
index 00000000000..f17d47176b9
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
@@ -0,0 +1,48 @@
+{
+ "metricFieldSpecs": [
+ ],
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "STRING",
+ "name": "ProductId"
+ },
+ {
+ "dataType": "STRING",
+ "name": "UserId"
+ },
+ {
+ "dataType": "INT",
+ "name": "Score"
+ },
+ {
+ "dataType": "STRING",
+ "name": "Summary"
+ },
+ {
+ "dataType": "STRING",
+ "name": "Text"
+ },
+ {
+ "dataType": "STRING",
+ "name": "combined"
+ },
+ {
+ "dataType": "INT",
+ "name": "n_tokens"
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "embedding",
+ "singleValueField": false
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "ts",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:SECONDS"
+ }
+ ],
+ "schemaName": "fineFoodReviews_part_0"
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
new file mode 100644
index 00000000000..ab4ee90fac5
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
@@ -0,0 +1,79 @@
+{
+ "tableName": "fineFoodReviews_part_1",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "segmentPushType": "APPEND",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "timeColumnName": "ts",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "5",
+ "replication": "1"
+ },
+ "tenants": {
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "noDictionaryColumns": ["Text"],
+ "multiColumnTextIndexConfig": {
+ "columns": ["UserId", "ProductId", "Summary"]
+ }
+ },
+ "routing": {
+ "segmentPrunerTypes": [
+ "time"
+ ]
+ },
+ "ingestionConfig": {
+ "streamIngestionConfig": {
+ "streamConfigMaps": [
+ {
+ "streamType": "kafka",
+ "stream.kafka.partition.ids": "1",
+ "stream.kafka.topic.name": "fineFoodReviews",
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
+ "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+ "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+ "stream.kafka.broker.list": "localhost:19092",
+ "realtime.segment.flush.threshold.time": "3600000",
+ "realtime.segment.flush.threshold.size": "50000"
+ }
+ ]
+ },
+ "transformConfigs": [
+ {
+ "columnName": "ts",
+ "transformFunction": "now()"
+ }
+ ]
+ },
+ "metadata": {
+ "customConfigs": {
+ }
+ },
+ "fieldConfigList": [
+ {
+ "encodingType": "RAW",
+ "indexType": "VECTOR",
+ "name": "embedding",
+ "properties": {
+ "vectorIndexType": "HNSW",
+ "vectorDimension": 1536,
+ "vectorDistanceFunction": "COSINE",
+ "version": 1,
+ "commitDocs": "1"
+ }
+ },
+ {
+ "name": "Text",
+ "encodingType": "RAW",
+ "indexes": {
+ "text": {
+ "deriveNumDocsPerChunkForRawIndex": "true",
+ "rawIndexWriterVersion": "3",
+ "caseSensitive": "false"
+ }
+ }
+ }
+ ]
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
new file mode 100644
index 00000000000..4162341f070
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
@@ -0,0 +1,48 @@
+{
+ "metricFieldSpecs": [
+ ],
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "STRING",
+ "name": "ProductId"
+ },
+ {
+ "dataType": "STRING",
+ "name": "UserId"
+ },
+ {
+ "dataType": "INT",
+ "name": "Score"
+ },
+ {
+ "dataType": "STRING",
+ "name": "Summary"
+ },
+ {
+ "dataType": "STRING",
+ "name": "Text"
+ },
+ {
+ "dataType": "STRING",
+ "name": "combined"
+ },
+ {
+ "dataType": "INT",
+ "name": "n_tokens"
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "embedding",
+ "singleValueField": false
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "ts",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:SECONDS"
+ }
+ ],
+ "schemaName": "fineFoodReviews_part_1"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]