This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 097a89f0400 Add support for Kafka subset-partition realtime ingestion 
(#17587)
097a89f0400 is described below

commit 097a89f0400189987ddf35da790558b09e5e3f91
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Mar 12 11:48:52 2026 -0700

    Add support for Kafka subset-partition realtime ingestion (#17587)
    
    Allow a realtime table to consume only a subset of Kafka partitions by
    configuring `stream.kafka.partition.ids` (e.g. "0,2,5"). This enables
    splitting a single Kafka topic across multiple Pinot tables for
    independent scaling and isolation.
    
    Key changes:
    - Add KafkaPartitionSubsetUtils to parse and validate partition ID
      configuration from StreamConfig
    - Update KafkaStreamMetadataProvider (kafka30/kafka40) to filter
      partitions and offsets based on the configured subset while still
      returning the total Kafka partition count for instance assignment
    - Update PinotLLCRealtimeSegmentManager to use total partition count
      from instance partitions for segment ZK metadata, ensuring correct
      broker query routing across subset tables
    - Add QuickStart example with two subset tables splitting a 2-partition
      topic (fineFoodReviews_part_0 and fineFoodReviews_part_1)
    - Add unit tests, integration tests, and a chaos integration test
      validating partition assignment, segment creation, and query routing
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  27 +-
 .../instance/InstanceAssignmentTest.java           | 170 ++++++
 .../RealtimeReplicaGroupSegmentAssignmentTest.java | 174 +++++-
 .../KafkaPartitionSubsetChaosIntegrationTest.java  | 607 +++++++++++++++++++++
 ...alTableWithTwoRealtimeTableIntegrationTest.java | 251 +++++++++
 .../kafka30/KafkaStreamMetadataProvider.java       |  85 ++-
 .../kafka30/KafkaPartitionLevelConsumerTest.java   |  76 +++
 .../kafka40/KafkaStreamMetadataProvider.java       |  85 ++-
 .../kafka40/KafkaPartitionLevelConsumerTest.java   | 109 +++-
 .../kafka/KafkaPartitionLevelStreamConfig.java     |   9 +
 .../stream/kafka/KafkaPartitionSubsetUtils.java    |  85 +++
 .../stream/kafka/KafkaStreamConfigProperties.java  |   6 +
 .../kafka/KafkaPartitionSubsetUtilsTest.java       | 201 +++++++
 .../org/apache/pinot/tools/QuickStartBase.java     | 104 +++-
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  25 +
 ...neFoodReviews_part_0_realtime_table_config.json |  79 +++
 .../fineFoodReviews_part_0_schema.json             |  48 ++
 ...neFoodReviews_part_1_realtime_table_config.json |  79 +++
 .../fineFoodReviews_part_1_schema.json             |  48 ++
 19 files changed, 2210 insertions(+), 58 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6720161c620..113cdac8e5c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -397,12 +397,11 @@ public class PinotLLCRealtimeSegmentManager {
     String realtimeTableName = tableConfig.getTableName();
     LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
 
-    int numPartitionGroups = 0;
     for (StreamMetadata streamMetadata : streamMetadataList) {
       
_flushThresholdUpdateManager.clearFlushThresholdUpdater(streamMetadata.getStreamConfig());
-      numPartitionGroups += streamMetadata.getNumPartitions();
     }
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
+    int numPartitionGroups = getPartitionCountForRouting(streamMetadataList);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     SegmentAssignment segmentAssignment =
@@ -1727,10 +1726,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-    int numPartitions = 0;
-    for (StreamMetadata streamMetadata : streamMetadataList) {
-      numPartitions += streamMetadata.getNumPartitions();
-    }
+    int numPartitions = getPartitionCountForRouting(streamMetadataList);
 
     SegmentAssignment segmentAssignment =
         SegmentAssignmentFactory.getSegmentAssignment(_helixManager, 
tableConfig, _controllerMetrics);
@@ -2123,6 +2119,25 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  /**
+   * Gets the total partition count for routing segment assignment.
+   *
+   * <p>For subset-partition tables, this returns the total Kafka topic 
partition count (not the
+   * subset size) so that RealtimeSegmentAssignment correctly routes 
non-contiguous partition IDs via
+   * {@code partitionId % totalPartitions}.
+   *
+   * <p>For standard tables, this equals the number of actively consumed 
partitions.
+   *
+   * <p>For tables with multiple streams, this sums the partition counts 
across all streams.
+   */
+  private int getPartitionCountForRouting(List<StreamMetadata> 
streamMetadataList) {
+    int totalPartitionCount = 0;
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      totalPartitionCount += streamMetadata.getNumPartitions();
+    }
+    return totalPartitionCount;
+  }
+
   private int getMaxNumPartitionsPerInstance(InstancePartitions 
instancePartitions, int numPartitions,
       int numReplicas) {
     if (instancePartitions.getNumReplicaGroups() == 1) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 62a1af26bd6..e1d76e916a3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -3391,4 +3391,174 @@ public class InstanceAssignmentTest {
       assertEquals(steadyStatePartitions.getInstances(0, rg), 
initialPartitions.getInstances(0, rg));
     }
   }
+  /**
+   * Verifies that subset-partition tables use the total Kafka partition count 
(not the subset size)
+   * for instance assignment, producing the same server spread as a normal 
full-partition table.
+   *
+   * <p><b>Topology:</b> 2 Kafka topic partitions, 4 servers, 2 replica groups,
+   * 1 instance per partition per replica group.
+   * <ul>
+   *   <li>Table A has {@code stream.kafka.partition.ids = "0"} (consumes only 
Kafka partition 0)</li>
+   *   <li>Table B has {@code stream.kafka.partition.ids = "1"} (consumes only 
Kafka partition 1)</li>
+   * </ul>
+   *
+   * <p>The {@link ImplicitRealtimeTablePartitionSelector} always fetches the 
<em>total</em>
+   * Kafka partition count from {@link 
StreamMetadataProvider#fetchPartitionCount} (= 2). This
+   * produces an instance map with <b>two distinct slots</b> so that {@link
+   * 
org.apache.pinot.controller.helix.core.assignment.segment.RealtimeSegmentAssignment}
 routes
+   * Kafka partition 0 → slot 0 and Kafka partition 1 → slot 1, each backed by 
different servers.
+   *
+   * <p>Without this behaviour ({@code numPartitions = subsetSize = 1}), only 
slot 0 exists, and
+   * the assignment computes {@code kafkaPartitionId % 1 = 0} for 
<em>every</em> Kafka partition,
+   * routing all consuming segments to the same slot-0 servers — a hotspot on 
lower-indexed servers.
+   *
+   * <p><b>Pre-computed hash rotations</b> (used for exact expected server 
values):
+   * <pre>
+   *   Math.abs("subsetTablePartition0_REALTIME".hashCode()) % 4 = 0  →  no 
rotation
+   *   Pool after rotation: [s0, s1, s2, s3]
+   *   Round-robin to 2 RGs:  RG0=[s0,s2],  RG1=[s1,s3]
+   *     slot 0: RG0=s0, RG1=s1   |   slot 1: RG0=s2, RG1=s3
+   *
+   *   Math.abs("subsetTablePartition1_REALTIME".hashCode()) % 4 = 1  →  
rotate by 1
+   *   Pool after rotation: [s1, s2, s3, s0]
+   *   Round-robin to 2 RGs:  RG0=[s1,s3],  RG1=[s2,s0]
+   *     slot 0: RG0=s1, RG1=s2   |   slot 1: RG0=s3, RG1=s0
+   * </pre>
+   */
+  @Test
+  public void testSubsetPartitionInstanceAssignmentNoHotspot() {
+    final int numReplicas = 2;
+    final int numKafkaPartitions = 2;   // total Kafka topic partition count
+    final int numServers = 4;
+    final int numInstancesPerReplicaGroup = numServers / numReplicas; // = 2
+
+    // 4 servers, single pool (non-pool-based), sorted lexicographically:
+    //   [Server_localhost_0, Server_localhost_1, Server_localhost_2, 
Server_localhost_3]
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      InstanceConfig cfg = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      cfg.addTag(REALTIME_TAG);
+      instanceConfigs.add(cfg);
+    }
+
+    // The mock always returns 2 (total topic partition count) regardless of 
the configured subset.
+    StreamMetadataProvider streamMetadataProvider = 
mock(StreamMetadataProvider.class);
+    
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numKafkaPartitions);
+
+    InstanceReplicaGroupPartitionConfig rgConfig = new 
InstanceReplicaGroupPartitionConfig(
+        true, 0, numReplicas, numInstancesPerReplicaGroup, 0, 0, false, null);
+    InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new InstanceTagPoolConfig(REALTIME_TAG, false, 0, null), null, 
rgConfig,
+        
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
 false);
+
+    // ── Table A: assigned subset {partition 0} 
───────────────────────────────────────────────
+    // Hash rotation = 0  →  pool [s0,s1,s2,s3] unchanged.
+    // Round-robin → RG0=[s0,s2], RG1=[s1,s3]; 1 instance/partition 
(ImplicitSelector enforces):
+    //   slot 0: RG0=s0, RG1=s1
+    //   slot 1: RG0=s2, RG1=s3
+    String tableAName = "subsetTablePartition0";
+    TableConfig tableAConfig = new TableConfigBuilder(TableType.REALTIME)
+        
.setTableName(tableAName).setServerTenant(TENANT_NAME).setNumReplicas(numReplicas)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    InstancePartitions tableAPartitions = new 
InstanceAssignmentDriver(tableAConfig)
+        .getInstancePartitions(
+            
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableAName),
+            instanceAssignmentConfig, instanceConfigs, null, false,
+            new ImplicitRealtimeTablePartitionSelector(rgConfig, 
tableAConfig.getTableName(), null, false,
+                streamMetadataProvider));
+
+    // Key correctness check: total Kafka partition count (2) must be used, 
not subset size (1).
+    assertEquals(tableAPartitions.getNumPartitions(), numKafkaPartitions,
+        "Table A must use total Kafka partition count, not subset size");
+    assertEquals(tableAPartitions.getNumReplicaGroups(), numReplicas);
+    // slot 0 (Kafka partition 0 → 0 % 2 = 0)
+    assertEquals(tableAPartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(tableAPartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    // slot 1 (Kafka partition 1 → 1 % 2 = 1, if it were consumed here)
+    assertEquals(tableAPartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    assertEquals(tableAPartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+
+    // ── Table B: assigned subset {partition 1} 
───────────────────────────────────────────────
+    // Hash rotation = 1  →  rotated pool [s1,s2,s3,s0].
+    // Round-robin → RG0=[s1,s3], RG1=[s2,s0]; 1 instance/partition:
+    //   slot 0: RG0=s1, RG1=s2
+    //   slot 1: RG0=s3, RG1=s0
+    String tableBName = "subsetTablePartition1";
+    TableConfig tableBConfig = new TableConfigBuilder(TableType.REALTIME)
+        
.setTableName(tableBName).setServerTenant(TENANT_NAME).setNumReplicas(numReplicas)
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), 
instanceAssignmentConfig))
+        .build();
+    InstancePartitions tableBPartitions = new 
InstanceAssignmentDriver(tableBConfig)
+        .getInstancePartitions(
+            
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableBName),
+            instanceAssignmentConfig, instanceConfigs, null, false,
+            new ImplicitRealtimeTablePartitionSelector(rgConfig, 
tableBConfig.getTableName(), null, false,
+                streamMetadataProvider));
+
+    assertEquals(tableBPartitions.getNumPartitions(), numKafkaPartitions,
+        "Table B must use total Kafka partition count, not subset size");
+    assertEquals(tableBPartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(tableBPartitions.getInstances(0, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(tableBPartitions.getInstances(0, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+    // slot 1 (Kafka partition 1 → 1 % 2 = 1)
+    assertEquals(tableBPartitions.getInstances(1, 0), 
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(tableBPartitions.getInstances(1, 1), 
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+
+    // ── Anti-hotspot: within each table, slot 0 and slot 1 use disjoint 
servers ───────────────
+    // RealtimeSegmentAssignment routes: Kafka partition X → slot = X % 
numPartitions.
+    // With numPartitions=2, slot 0 and slot 1 are guaranteed to be on 
different servers,
+    // so different Kafka partitions do NOT share consuming instances within 
the same table.
+    Set<String> tableASlot0 = new HashSet<>(tableAPartitions.getInstances(0, 
0));
+    tableASlot0.addAll(tableAPartitions.getInstances(0, 1));
+    Set<String> tableASlot1 = new HashSet<>(tableAPartitions.getInstances(1, 
0));
+    tableASlot1.addAll(tableAPartitions.getInstances(1, 1));
+    assertTrue(Collections.disjoint(tableASlot0, tableASlot1),
+        "Table A: slot 0 and slot 1 must be on disjoint servers (no 
intra-table hotspot)");
+
+    Set<String> tableBSlot0 = new HashSet<>(tableBPartitions.getInstances(0, 
0));
+    tableBSlot0.addAll(tableBPartitions.getInstances(0, 1));
+    Set<String> tableBSlot1 = new HashSet<>(tableBPartitions.getInstances(1, 
0));
+    tableBSlot1.addAll(tableBPartitions.getInstances(1, 1));
+    assertTrue(Collections.disjoint(tableBSlot0, tableBSlot1),
+        "Table B: slot 0 and slot 1 must be on disjoint servers (no 
intra-table hotspot)");
+
+    // Each table spreads load evenly: 2 slots × 2 replica groups = all 4 
servers.
+    Set<String> tableAAll = new HashSet<>(tableASlot0);
+    tableAAll.addAll(tableASlot1);
+    assertEquals(tableAAll.size(), numServers,
+        "Table A must use all " + numServers + " servers");
+    Set<String> tableBAll = new HashSet<>(tableBSlot0);
+    tableBAll.addAll(tableBSlot1);
+    assertEquals(tableBAll.size(), numServers,
+        "Table B must use all " + numServers + " servers");
+
+    // ── Negative case: numPartitions = 1 (wrong: uses subset size instead of 
total count) ─────
+    // With InstanceReplicaGroupPartitionSelector (bypasses stream-count 
lookup), numPartitions=1.
+    // Only slot 0 exists in the instance map.  RealtimeSegmentAssignment then 
computes:
+    //   Kafka partition 1 → 1 % 1 = 0 → slot 0  (same as partition 0 → 
HOTSPOT)
+    InstanceReplicaGroupPartitionConfig wrongRgConfig = new 
InstanceReplicaGroupPartitionConfig(
+        true, 0, numReplicas, numInstancesPerReplicaGroup, 1 /* wrong: subset 
size */, 1, false, null);
+
+    // Table B wrong assignment (rotation=1): only slot 0  →  RG0=s1, RG1=s2.
+    InstancePartitions wrongTableBPartitions = new 
InstanceAssignmentDriver(tableBConfig)
+        .getInstancePartitions(
+            
InstancePartitionsType.CONSUMING.getInstancePartitionsName(tableBName),
+            instanceAssignmentConfig, instanceConfigs, null, false,
+            new InstanceReplicaGroupPartitionSelector(wrongRgConfig, 
tableBConfig.getTableName(), null, false));
+
+    // Wrong approach: only slot 0 exists; slot 1 is missing entirely.
+    assertEquals(wrongTableBPartitions.getNumPartitions(), 1,
+        "Wrong approach produces only 1 partition slot");
+    assertNull(wrongTableBPartitions.getInstances(1, 0),
+        "Slot 1 must not exist when numPartitions=1; Kafka partition 1 falls 
back to slot 0 via 1 % 1 = 0");
+
+    // Under the wrong approach, Kafka partition 1 would be routed to slot 0 
(RG0 → s1).
+    // Under the correct approach, it routes to slot 1 (RG0 → s3).  These are 
different servers.
+    String wrongServerForP1 = wrongTableBPartitions.getInstances(0, 0).get(0); 
// s1 (slot-0 hotspot)
+    String correctServerForP1 = tableBPartitions.getInstances(1, 0).get(0);    
// s3 (slot 1)
+    assertNotEquals(wrongServerForP1, correctServerForP1,
+        "Wrong approach routes Kafka partition 1 to '" + wrongServerForP1
+            + "' (slot-0 hotspot) instead of the correct '" + 
correctServerForP1 + "' (slot 1)");
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 9b00d996676..4b00f8a2608 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -335,19 +335,20 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
   public void testExplicitPartition() {
     // CONSUMING instances:
     // {
-    //   0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2],
-    //   0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5],
-    //   0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8]
+    //   0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2], 
3_0=[instance_0],
+    //   0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5], 
3_1=[instance_3],
+    //   0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8], 3_2=[instance_6]
     // }
     //        p0                p1                p2
     //        p3
     InstancePartitions consumingInstancePartitions = new 
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
     int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / 
NUM_REPLICAS;
-    int consumingInstanceIdToAdd = 0;
     for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
-      for (int partitionId = 0; partitionId < 
numConsumingInstancesPerReplicaGroup; partitionId++) {
+      for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
+        int instanceIndex = (partitionId % 
numConsumingInstancesPerReplicaGroup)
+            + replicaGroupId * numConsumingInstancesPerReplicaGroup;
         consumingInstancePartitions.setInstances(partitionId, replicaGroupId,
-            
Collections.singletonList(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++)));
+            Collections.singletonList(CONSUMING_INSTANCES.get(instanceIndex)));
       }
     }
 
@@ -458,6 +459,167 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
 
+  /**
+   * Tests segment assignment for tables consuming subset partitions with 
non-contiguous partition IDs.
+   *
+   * <p><b>Key Invariant:</b> A table consuming subset partitions {0, 2, 5, 7} 
from an 8-partition topic
+   * should assign those partitions to the SAME instances as a full table 
consuming all 8 partitions.
+   *
+   * <p>This is achieved by using the TOTAL partition count (8) in the 
instance partitions map, so that
+   * RealtimeSegmentAssignment routes both subset and full tables identically 
via {@code partitionId % 8}.
+   *
+   * <p>This test verifies:
+   * <ul>
+   *   <li>Subset table assignment matches full table assignment for the same 
partition IDs</li>
+   *   <li>Non-contiguous partitions don't create hotspots</li>
+   *   <li>All consuming instances are utilized (no underutilization)</li>
+   * </ul>
+   */
+  @Test
+  public void testSubsetPartitionAssignment() {
+    // Subset partition IDs: non-contiguous selection from 8 total partitions
+    int[] subsetPartitionIds = {0, 2, 5, 7};
+    int totalKafkaPartitions = 8;
+    int numSegmentsPerPartition = 3;
+
+    // Create segments only for the subset partitions
+    List<String> subsetSegments = new ArrayList<>();
+    for (int partitionId : subsetPartitionIds) {
+      for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
+        subsetSegments.add(new LLCSegmentName(RAW_TABLE_NAME, partitionId, 
seqNum,
+            System.currentTimeMillis()).getSegmentName());
+      }
+    }
+
+    // CONSUMING instances with explicit partitions based on TOTAL partition 
count (8):
+    // This is critical: instance map has 8 slots, not 4, even though only 4 
partitions are consumed.
+    // {
+    //   0_0=[instance_0], 1_0=[instance_1], 2_0=[instance_2], 
3_0=[instance_0],
+    //   4_0=[instance_1], 5_0=[instance_2], 6_0=[instance_0], 
7_0=[instance_1],
+    //   0_1=[instance_3], 1_1=[instance_4], 2_1=[instance_5], 
3_1=[instance_3],
+    //   4_1=[instance_4], 5_1=[instance_5], 6_1=[instance_3], 
7_1=[instance_4],
+    //   0_2=[instance_6], 1_2=[instance_7], 2_2=[instance_8], 
3_2=[instance_6],
+    //   4_2=[instance_7], 5_2=[instance_8], 6_2=[instance_6], 7_2=[instance_7]
+    // }
+    InstancePartitions consumingInstancePartitions = new 
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
+    int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / 
NUM_REPLICAS;
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      for (int partitionId = 0; partitionId < totalKafkaPartitions; 
partitionId++) {
+        int instanceIndex = (partitionId % 
numConsumingInstancesPerReplicaGroup)
+            + replicaGroupId * numConsumingInstancesPerReplicaGroup;
+        consumingInstancePartitions.setInstances(partitionId, replicaGroupId,
+            Collections.singletonList(CONSUMING_INSTANCES.get(instanceIndex)));
+      }
+    }
+
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        Map.of(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
+
+    // First, assign segments for a FULL table (all 8 partitions) to establish 
baseline
+    Map<String, Map<String, String>> fullTableAssignment = new TreeMap<>();
+    List<String> fullTableSegments = new ArrayList<>();
+    for (int partitionId = 0; partitionId < totalKafkaPartitions; 
partitionId++) {
+      for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
+        fullTableSegments.add(new LLCSegmentName(RAW_TABLE_NAME, partitionId, 
seqNum,
+            System.currentTimeMillis()).getSegmentName());
+      }
+    }
+    for (String segmentName : fullTableSegments) {
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, fullTableAssignment, 
instancePartitionsMap);
+      fullTableAssignment.put(segmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+    }
+
+    // Now assign segments for SUBSET table (only partitions {0, 2, 5, 7})
+    Map<String, Map<String, String>> subsetTableAssignment = new TreeMap<>();
+    for (String segmentName : subsetSegments) {
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(segmentName, subsetTableAssignment, 
instancePartitionsMap);
+      assertEquals(instancesAssigned.size(), NUM_REPLICAS);
+
+      // Extract partition ID from segment name
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      int partitionId = llcSegmentName.getPartitionGroupId();
+
+      // Verify the partition ID is one of our subset partitions
+      boolean isSubsetPartition = false;
+      for (int subsetId : subsetPartitionIds) {
+        if (partitionId == subsetId) {
+          isSubsetPartition = true;
+          break;
+        }
+      }
+      assertTrue(isSubsetPartition, "Segment partition " + partitionId + " 
should be in subset");
+
+      // KEY VALIDATION: Subset table assignment should MATCH full table 
assignment for same partition
+      // This proves that subset tables use the same routing logic as full 
tables
+      Map<String, String> fullTableInstanceStateMap = 
fullTableAssignment.get(segmentName);
+      Map<String, String> subsetTableInstanceStateMap =
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING);
+
+      assertEquals(subsetTableInstanceStateMap.keySet(), 
fullTableInstanceStateMap.keySet(),
+          "Subset table partition " + partitionId + " should be assigned to 
the SAME instances as full table");
+
+      // Verify assignment uses partitionId % totalPartitions for routing
+      // Partition 0 → slot 0 → instance 0, 3, 6
+      // Partition 2 → slot 2 → instance 2, 5, 8
+      // Partition 5 → slot 5 → instance 2, 5, 8  (5 % 3 = 2)
+      // Partition 7 → slot 7 → instance 1, 4, 7  (7 % 3 = 1)
+      for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+        int expectedInstanceIndex = (partitionId % 
numConsumingInstancesPerReplicaGroup)
+            + replicaGroupId * numConsumingInstancesPerReplicaGroup;
+        String expectedInstance = 
CONSUMING_INSTANCES.get(expectedInstanceIndex);
+        assertEquals(instancesAssigned.get(replicaGroupId), expectedInstance,
+            "Partition " + partitionId + " in replica group " + replicaGroupId
+                + " should be assigned to instance " + expectedInstance);
+      }
+
+      subsetTableAssignment.put(segmentName, subsetTableInstanceStateMap);
+    }
+
+    // Verify that subset table uses the same instances as full table for each 
partition
+    for (String segmentName : subsetSegments) {
+      Map<String, String> subsetInstances = 
subsetTableAssignment.get(segmentName);
+      Map<String, String> fullInstances = fullTableAssignment.get(segmentName);
+      assertEquals(subsetInstances.keySet(), fullInstances.keySet(),
+          "Segment " + segmentName + " should have identical instance 
assignment in subset and full tables");
+    }
+
+    // Verify all subset partitions are distributed across different instances 
(no hotspot)
+    // With modulo-8 routing:
+    // Partition 0 → slot 0 → instances {0, 3, 6}
+    // Partition 2 → slot 2 → instances {2, 5, 8}
+    // Partition 5 → slot 5 → instances {2, 5, 8} (same as slot 2)
+    // Partition 7 → slot 7 → instances {1, 4, 7}
+    // Total unique instances: {0, 1, 2, 3, 4, 5, 6, 7, 8} = 9 instances
+    HashSet<String> usedInstances = new HashSet<>();
+    for (Map<String, String> instanceStateMap : 
subsetTableAssignment.values()) {
+      usedInstances.addAll(instanceStateMap.keySet());
+    }
+    // With correct modulo-8 routing, all 9 consuming instances should be used
+    assertEquals(usedInstances.size(), NUM_CONSUMING_INSTANCES,
+        "Subset partition assignment with modulo-8 routing should use all 
instances, found: " + usedInstances.size());
+
+    // Verify each subset partition uses distinct instance sets (within each 
replica group)
+    Map<Integer, HashSet<String>> partitionToInstancesRG0 = new TreeMap<>();
+    for (int partitionId : subsetPartitionIds) {
+      int instanceIndex = partitionId % numConsumingInstancesPerReplicaGroup;
+      HashSet<String> instances = new HashSet<>();
+      instances.add(CONSUMING_INSTANCES.get(instanceIndex)); // RG0
+      instances.add(CONSUMING_INSTANCES.get(instanceIndex + 
numConsumingInstancesPerReplicaGroup)); // RG1
+      instances.add(CONSUMING_INSTANCES.get(instanceIndex + 2 * 
numConsumingInstancesPerReplicaGroup)); // RG2
+      partitionToInstancesRG0.put(partitionId, instances);
+    }
+
+    // Partitions 0 and 7 should NOT share the same instance set
+    // (This would happen if we incorrectly used subset size = 4 for routing)
+    HashSet<String> partition0Instances = partitionToInstancesRG0.get(0);
+    HashSet<String> partition7Instances = partitionToInstancesRG0.get(7);
+    assertTrue(!partition0Instances.equals(partition7Instances),
+        "Partitions 0 and 7 should map to different instance sets to avoid 
hotspots");
+  }
+
   private HelixManager createHelixManager() {
     HelixManager helixManager = mock(HelixManager.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
new file mode 100644
index 00000000000..928bc44537f
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/KafkaPartitionSubsetChaosIntegrationTest.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterTest;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Chaos integration test for the Kafka partition subset feature.
+ *
+ * <p><b>Topology:</b>
+ * <ul>
+ *   <li>1 Kafka topic with {@value NUM_KAFKA_PARTITIONS} partitions</li>
+ *   <li>1 "control" realtime table consuming <em>all</em> partitions (no 
subset filter), serving as ground truth</li>
+ *   <li>3 "subset" realtime tables with <em>non-contiguous</em> partition 
assignments:
+ *     <ul>
+ *       <li>subset_0: partitions {0, 3}</li>
+ *       <li>subset_1: partitions {1, 4}</li>
+ *       <li>subset_2: partitions {2, 5}</li>
+ *     </ul>
+ *   </li>
+ *   <li>1 logical table aggregating all three subset tables</li>
+ * </ul>
+ *
+ * <p><b>Key invariant:</b> {@code COUNT(*) on logical table == COUNT(*) on 
control table}
+ *
+ * <p><b>Chaos scenarios tested:</b>
+ * <ol>
+ *   <li>Force-commit consuming segments on all subset tables</li>
+ *   <li>Pause consumption, push more data, then resume – data must not be 
lost or duplicated</li>
+ *   <li>Server restart – tables must recover and resume from their last 
committed offset</li>
+ *   <li>Ingest a second batch after restart – counts must continue to 
match</li>
+ * </ol>
+ *
+ * <p>The non-contiguous partition IDs (e.g., {0,3}) are intentional: they 
expose regressions
+ * where code uses {@code getNumPartitions()} (returns a sequential 0..N-1 
count) instead of
+ * {@code getPartitionIds()} (returns the actual assigned partition ID set).
+ */
+public class KafkaPartitionSubsetChaosIntegrationTest extends 
BaseClusterIntegrationTest {
+
+  // Kafka topic: 6 partitions to allow non-contiguous splits
+  private static final String KAFKA_TOPIC = "partitionSubsetChaosTopic";
+  private static final int NUM_KAFKA_PARTITIONS = 6;
+
+  // Control table: ground truth – reads all 6 partitions
+  private static final String CONTROL_TABLE = "controlAllPartitions";
+
+  // Logical table: aggregates the three subset tables
+  private static final String LOGICAL_TABLE = "logicalSubset";
+
+  // Subset tables with non-contiguous partition assignments.
+  // Using {0,3}, {1,4}, {2,5} deliberately skips sequential ordering to expose
+  // getNumPartitions() vs getPartitionIds() bugs.
+  private static final String[] SUBSET_TABLE_NAMES = {"subset0", "subset1", 
"subset2"};
+  private static final int[][] SUBSET_PARTITION_ASSIGNMENTS = {{0, 3}, {1, 4}, 
{2, 5}};
+
+  private static final long VERIFICATION_TIMEOUT_MS = 600_000L;
+  private static final String DEFAULT_TENANT = "DefaultTenant";
+
+  // Per-partition record counts – updated whenever we push data
+  private final long[] _partitionRecordCounts = new long[NUM_KAFKA_PARTITIONS];
+
+  // 
---------------------------------------------------------------------------
+  // ClusterTest overrides
+  // 
---------------------------------------------------------------------------
+
+  @Override
+  protected String getKafkaTopic() {
+    return KAFKA_TOPIC;
+  }
+
+  @Override
+  protected int getNumKafkaPartitions() {
+    return NUM_KAFKA_PARTITIONS;
+  }
+
+  /** The "default" table name used by the base-class helper methods (schema 
creation, etc.). */
+  @Override
+  public String getTableName() {
+    return CONTROL_TABLE;
+  }
+
+  @Override
+  public String getHelixClusterName() {
+    return "KafkaPartitionSubsetChaosIntegrationTest";
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Suite lifecycle
+  // 
---------------------------------------------------------------------------
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startKafka();
+
+    List<File> avroFiles = getAllAvroFiles();
+
+    // Set the sample file used by AvroFileSchemaKafkaAvroMessageDecoder 
(static, schema is shared)
+    ClusterTest.AvroFileSchemaKafkaAvroMessageDecoder._avroFile = 
avroFiles.get(0);
+
+    // ---- Control table: consumes all partitions ----
+    Schema controlSchema = createSchema();
+    addSchema(controlSchema);
+    addTableConfig(buildRealtimeTableConfig(CONTROL_TABLE, null));
+
+    // ---- Subset tables: each consumes a non-contiguous partition subset ----
+    for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+      Schema schema = createSchema();
+      schema.setSchemaName(SUBSET_TABLE_NAMES[i]);
+      addSchema(schema);
+      addTableConfig(buildRealtimeTableConfig(SUBSET_TABLE_NAMES[i], 
SUBSET_PARTITION_ASSIGNMENTS[i]));
+    }
+
+    // ---- Logical table ----
+    Schema logicalSchema = createSchema();
+    logicalSchema.setSchemaName(LOGICAL_TABLE);
+    addSchema(logicalSchema);
+    addLogicalTableConfig(LOGICAL_TABLE, buildSubsetPhysicalTableNames());
+
+    // ---- Push initial data round-robin across all 6 partitions ----
+    pushAvroRoundRobin(avroFiles, 0);
+
+    // ---- Wait for all tables to catch up ----
+    waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Tests (run in declared order via priority)
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Sanity check: after initial ingestion the logical table and the control 
table must agree.
+   */
+  @Test(priority = 0)
+  public void testCountInvariantAfterInitialIngestion() {
+    assertCountInvariant();
+  }
+
+  /**
+   * Force-commit consuming segments on each subset table and verify no data 
is lost or duplicated.
+   *
+   * <p>This exercises the path where the controller picks up consuming 
segments by partition ID.
+   * A regression using {@code getNumPartitions()} would cause it to iterate 
partitions 0..1 for a
+   * table assigned to e.g. {0,3} and miss partition 3 entirely.
+   */
+  @Test(priority = 1, dependsOnMethods = 
"testCountInvariantAfterInitialIngestion")
+  public void testCountInvariantAfterForceCommit()
+      throws Exception {
+    for (String tableName : SUBSET_TABLE_NAMES) {
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      Set<String> consumingBefore = getConsumingSegments(realtimeTableName);
+      if (consumingBefore.isEmpty()) {
+        continue;
+      }
+      String jobId = forceCommit(realtimeTableName);
+      waitForForceCommitCompletion(realtimeTableName, jobId, consumingBefore, 
120_000L);
+    }
+
+    // Counts must be unchanged after force-commit
+    assertCountInvariant();
+  }
+
+  /**
+   * Pause consumption on all subset tables, push a second batch of records, 
then resume.
+   *
+   * <p>While consumption is paused the control table keeps ingesting, so the 
logical table will
+   * temporarily fall behind. After resumption it must catch back up so that 
the invariant is
+   * restored.
+   */
+  @Test(priority = 2, dependsOnMethods = "testCountInvariantAfterForceCommit")
+  public void testCountInvariantAfterPauseResume()
+      throws Exception {
+    // Pause all subset tables
+    for (String tableName : SUBSET_TABLE_NAMES) {
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      pauseConsumptionAndWait(realtimeTableName);
+    }
+
+    // Push a second batch – control table will consume it, subset tables will 
not (yet)
+    List<File> avroFiles = getAllAvroFiles();
+    long countBeforePush = totalExpectedCount();
+    pushAvroRoundRobin(avroFiles, /* keyOffset= */ (int) countBeforePush);
+
+    long expectedControlCount = totalExpectedCount();
+
+    // Wait for control table to ingest the new records
+    TestUtils.waitForCondition(
+        aVoid -> getCurrentCountStarResult(CONTROL_TABLE) == 
expectedControlCount,
+        1_000L, VERIFICATION_TIMEOUT_MS,
+        "Control table did not reach expected count " + expectedControlCount + 
" after second push");
+
+    // Resume all subset tables
+    for (String tableName : SUBSET_TABLE_NAMES) {
+      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      resumeConsumptionAndWait(realtimeTableName);
+    }
+
+    // After resumption the invariant must be restored
+    waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+  }
+
+  /**
+   * Restart all servers and verify all tables recover and resume from their 
last committed offset.
+   */
+  @Test(priority = 3, dependsOnMethods = "testCountInvariantAfterPauseResume")
+  public void testCountInvariantAfterServerRestart()
+      throws Exception {
+    long expectedTotal = totalExpectedCount();
+
+    restartServers();
+
+    // After restart, wait for all tables to reach their expected counts again
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return getCurrentCountStarResult(CONTROL_TABLE) == expectedTotal
+            && getCurrentCountStarResult(LOGICAL_TABLE) == expectedTotal;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 1_000L, VERIFICATION_TIMEOUT_MS, "Tables did not recover expected 
counts after server restart");
+
+    assertCountInvariant();
+  }
+
+  /**
+   * Push a third batch after server restart to confirm ongoing ingestion 
works correctly.
+   *
+   * <p>This catches regressions where partition state is re-initialised 
incorrectly after restart
+   * (e.g., resetting offsets to 0 based on a sequential 0..N-1 scan instead 
of the configured
+   * partition IDs).
+   */
+  @Test(priority = 4, dependsOnMethods = 
"testCountInvariantAfterServerRestart")
+  public void testCountInvariantAfterIngestPostRestart()
+      throws Exception {
+    List<File> avroFiles = getAllAvroFiles();
+    long countBeforePush = totalExpectedCount();
+    pushAvroRoundRobin(avroFiles, /* keyOffset= */ (int) countBeforePush);
+
+    waitForCountInvariant(VERIFICATION_TIMEOUT_MS);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Table-config builders
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Builds a realtime {@link TableConfig} for the given table name.
+   *
+   * @param tableName the raw (un-typed) table name
+   * @param partitionIds partition IDs to assign, or {@code null} to consume 
all partitions
+   */
+  private TableConfig buildRealtimeTableConfig(String tableName, int[] 
partitionIds) {
+    Map<String, String> streamConfigMap = new HashMap<>(getStreamConfigMap());
+
+    if (partitionIds != null) {
+      // Convert int[] to comma-separated string, e.g. "0,3"
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < partitionIds.length; i++) {
+        if (i > 0) {
+          sb.append(',');
+        }
+        sb.append(partitionIds[i]);
+      }
+      streamConfigMap.put("stream.kafka.partition.ids", sb.toString());
+    }
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
+
+    return new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(tableName)
+        .setTimeColumnName(getTimeColumnName())
+        .setNumReplicas(getNumReplicas())
+        .setBrokerTenant(DEFAULT_TENANT)
+        .setServerTenant(DEFAULT_TENANT)
+        .setIngestionConfig(ingestionConfig)
+        .setNullHandlingEnabled(getNullHandlingEnabled())
+        .build();
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Logical table helpers
+  // 
---------------------------------------------------------------------------
+
+  private List<String> buildSubsetPhysicalTableNames() {
+    List<String> names = new ArrayList<>();
+    for (String tableName : SUBSET_TABLE_NAMES) {
+      names.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+    }
+    return names;
+  }
+
+  private void addLogicalTableConfig(String logicalTableName, List<String> 
physicalTableNames)
+      throws IOException {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    for (String physicalTableName : physicalTableNames) {
+      physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+    }
+
+    String refRealtimeTableName = physicalTableNames.stream()
+        .filter(TableNameBuilder::isRealtimeTableResource)
+        .findFirst()
+        .orElse(null);
+
+    LogicalTableConfig logicalTableConfig = new LogicalTableConfigBuilder()
+        .setTableName(logicalTableName)
+        .setBrokerTenant(DEFAULT_TENANT)
+        .setRefRealtimeTableName(refRealtimeTableName)
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .build();
+
+    String addUrl = _controllerRequestURLBuilder.forLogicalTableCreate();
+    String resp = sendPostRequest(addUrl, 
logicalTableConfig.toSingleLineJsonString());
+    assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + 
logicalTableName
+        + " logical table successfully added.\"}");
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Data push helpers
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Pushes all records from the given avro files to the Kafka topic in strict 
round-robin order
+   * across all {@value NUM_KAFKA_PARTITIONS} partitions, starting the message 
key sequence at
+   * {@code keyOffset}.
+   *
+   * <p>Each call atomically updates {@link #_partitionRecordCounts} so that
+   * {@link #totalExpectedCount()} always reflects the total number of records 
in Kafka.
+   */
+  private void pushAvroRoundRobin(List<File> avroFiles, long keyOffset)
+      throws Exception {
+    Properties producerProps = new Properties();
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
getKafkaPort());
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+    long keySeq = keyOffset;
+    try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) 
{
+
+      for (File avroFile : avroFiles) {
+        try (DataFileStream<GenericRecord> stream = 
AvroUtils.getAvroReader(avroFile)) {
+          GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(stream.getSchema());
+          for (GenericRecord record : stream) {
+            int partition = (int) (keySeq % NUM_KAFKA_PARTITIONS);
+            outputStream.reset();
+
+            BinaryEncoder encoder = new 
EncoderFactory().directBinaryEncoder(outputStream, null);
+            writer.write(record, encoder);
+            encoder.flush();
+
+            producer.send(new ProducerRecord<>(
+                KAFKA_TOPIC, partition, Longs.toByteArray(keySeq), 
outputStream.toByteArray())).get();
+
+            _partitionRecordCounts[partition]++;
+            keySeq++;
+          }
+        }
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Verification helpers
+  // 
---------------------------------------------------------------------------
+
+  /** Returns the total number of records pushed across all partitions. */
+  private long totalExpectedCount() {
+    long total = 0;
+    for (long count : _partitionRecordCounts) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Returns the expected record count for subset table {@code subsetIndex} 
based on its
+   * partition assignment and the number of records pushed to each partition 
so far.
+   */
+  private long expectedSubsetCount(int subsetIndex) {
+    long count = 0;
+    for (int partition : SUBSET_PARTITION_ASSIGNMENTS[subsetIndex]) {
+      count += _partitionRecordCounts[partition];
+    }
+    return count;
+  }
+
+  /**
+   * Waits until the logical table and the control table both reflect all 
pushed data, and each
+   * subset table reflects exactly the records pushed to its assigned 
partitions.
+   */
+  private void waitForCountInvariant(long timeoutMs) {
+    long expectedTotal = totalExpectedCount();
+
+    // Wait for control table
+    TestUtils.waitForCondition(
+        aVoid -> getCurrentCountStarResult(CONTROL_TABLE) == expectedTotal,
+        1_000L, timeoutMs,
+        "Control table did not reach expected count " + expectedTotal);
+
+    // Wait for logical table
+    TestUtils.waitForCondition(
+        aVoid -> getCurrentCountStarResult(LOGICAL_TABLE) == expectedTotal,
+        1_000L, timeoutMs,
+        "Logical table did not reach expected count " + expectedTotal);
+
+    // Wait for each individual subset table
+    for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+      final long expectedSubset = expectedSubsetCount(i);
+      final String subsetTable = SUBSET_TABLE_NAMES[i];
+      TestUtils.waitForCondition(
+          aVoid -> getCurrentCountStarResult(subsetTable) == expectedSubset,
+          1_000L, timeoutMs,
+          "Subset table " + subsetTable + " did not reach expected count " + 
expectedSubset);
+    }
+  }
+
+  /**
+   * Hard assertion (no waiting) that all count invariants currently hold.
+   */
+  private void assertCountInvariant() {
+    long expectedTotal = totalExpectedCount();
+    long controlCount = getCurrentCountStarResult(CONTROL_TABLE);
+    long logicalCount = getCurrentCountStarResult(LOGICAL_TABLE);
+
+    assertEquals(controlCount, expectedTotal,
+        "Control table count mismatch. Expected: " + expectedTotal + ", 
actual: " + controlCount);
+    assertEquals(logicalCount, expectedTotal,
+        "Logical table count mismatch. Expected: " + expectedTotal + ", 
actual: " + logicalCount);
+
+    for (int i = 0; i < SUBSET_TABLE_NAMES.length; i++) {
+      long expectedSubset = expectedSubsetCount(i);
+      long actualSubset = getCurrentCountStarResult(SUBSET_TABLE_NAMES[i]);
+      assertEquals(actualSubset, expectedSubset,
+          "Subset table " + SUBSET_TABLE_NAMES[i] + " count mismatch."
+              + " Expected: " + expectedSubset + ", actual: " + actualSubset);
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Force-commit helpers
+  // 
---------------------------------------------------------------------------
+
+  private String forceCommit(String realtimeTableName)
+      throws Exception {
+    String response = 
sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(realtimeTableName),
 null);
+    return 
JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+  }
+
+  private void waitForForceCommitCompletion(String realtimeTableName, String 
jobId,
+      Set<String> consumingSegmentsBefore, long timeoutMs) {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        if (!isForceCommitJobCompleted(jobId)) {
+          return false;
+        }
+        // Verify that the previously-consuming segments are now DONE
+        for (String seg : consumingSegmentsBefore) {
+          var meta = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, seg);
+          assertNotNull(meta);
+          assertEquals(meta.getStatus(), 
CommonConstants.Segment.Realtime.Status.DONE);
+        }
+        return true;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, timeoutMs, "Force commit did not complete for " + realtimeTableName);
+  }
+
+  private boolean isForceCommitJobCompleted(String jobId)
+      throws Exception {
+    String resp = 
sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(jobId));
+    JsonNode status = JsonUtils.stringToJsonNode(resp);
+    assertEquals(status.get("jobType").asText(), "FORCE_COMMIT");
+
+    Set<String> pending = new HashSet<>();
+    for (JsonNode elem : 
status.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST))
 {
+      pending.add(elem.asText());
+    }
+    return pending.isEmpty();
+  }
+
+  private Set<String> getConsumingSegments(String realtimeTableName) {
+    IdealState idealState = 
_helixResourceManager.getTableIdealState(realtimeTableName);
+    assertNotNull(idealState);
+    Set<String> consuming = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
+      if 
(entry.getValue().containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING))
 {
+        consuming.add(entry.getKey());
+      }
+    }
+    return consuming;
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Pause / resume helpers
+  // 
---------------------------------------------------------------------------
+
+  private void pauseConsumptionAndWait(String realtimeTableName)
+      throws Exception {
+    getControllerRequestClient().pauseConsumption(realtimeTableName);
+    // After a successful pause no segments should remain in CONSUMING state
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        PauseStatusDetails details = 
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
+        return details != null && details.getPauseFlag()
+            && (details.getConsumingSegments() == null || 
details.getConsumingSegments().isEmpty());
+      } catch (Exception e) {
+        return false;
+      }
+    }, 500L, 30_000L, "Consumption did not pause for " + realtimeTableName);
+  }
+
+  private void resumeConsumptionAndWait(String realtimeTableName)
+      throws Exception {
+    getControllerRequestClient().resumeConsumption(realtimeTableName);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        PauseStatusDetails details = 
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
+        return details != null && !details.getPauseFlag();
+      } catch (Exception e) {
+        return false;
+      }
+    }, 500L, 30_000L, "Consumption did not resume for " + realtimeTableName);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
new file mode 100644
index 00000000000..f09ebdd35e4
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoRealtimeTableIntegrationTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class LogicalTableWithTwoRealtimeTableIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  private static final String KAFKA_TOPIC = "logicalTableWithTwoRealtimeTopic";
+  private static final String LOGICAL_TABLE_NAME = "mytable";
+  private static final String TABLE_NAME_0 = "rt_1";
+  private static final String TABLE_NAME_1 = "rt_2";
+  private static final List<String> REALTIME_TABLE_NAMES = 
List.of(TABLE_NAME_0, TABLE_NAME_1);
+  private static final Map<String, Integer> REALTIME_TABLE_PARTITIONS =
+      Map.of(TABLE_NAME_0, 0, TABLE_NAME_1, 1);
+  private static final int NUM_PARTITIONS = 2;
+  private static final int DOCS_LOADED_TIMEOUT_MS = 600_000;
+
+  private long _table0RecordCount;
+  private long _table1RecordCount;
+  private int _realtimeTableConfigIndex;
+  private int _kafkaPushIndex;
+
+  @Override
+  protected String getKafkaTopic() {
+    return KAFKA_TOPIC;
+  }
+
+  @Override
+  protected String getLogicalTableName() {
+    return LOGICAL_TABLE_NAME;
+  }
+
+  @Override
+  protected String getTableName() {
+    return LOGICAL_TABLE_NAME;
+  }
+
+  @Override
+  protected int getNumKafkaPartitions() {
+    return NUM_PARTITIONS;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return super.getCountStarResult();
+  }
+
+  @Override
+  protected List<String> getRealtimeTableNames() {
+    return REALTIME_TABLE_NAMES;
+  }
+
+  @Override
+  protected Map<String, List<File>> getRealtimeTableDataFiles() {
+    Map<String, List<File>> tableNameToFilesMap = new LinkedHashMap<>();
+    for (String tableName : REALTIME_TABLE_NAMES) {
+      tableNameToFilesMap.put(tableName, new ArrayList<>());
+    }
+
+    for (int i = 0; i < _avroFiles.size(); i++) {
+      tableNameToFilesMap.get(REALTIME_TABLE_NAMES.get(i % 
REALTIME_TABLE_NAMES.size())).add(_avroFiles.get(i));
+    }
+    return tableNameToFilesMap;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    super.setUp();
+    waitForRecordCounts();
+  }
+
+  @Test
+  public void testFederatedCountStar()
+      throws Exception {
+    assertEquals(_table0RecordCount,
+        
getCurrentCountStarResult(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_0)));
+    assertEquals(_table1RecordCount,
+        
getCurrentCountStarResult(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_1)));
+    assertEquals(_table0RecordCount + _table1RecordCount, 
getCurrentCountStarResult(LOGICAL_TABLE_NAME));
+  }
+
+  @Override
+  @Test
+  public void testQueryTimeOut()
+      throws Exception {
+    String starQuery = "SELECT * from " + getLogicalTableName();
+    QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null, 
null);
+    var logicalTableConfig = getLogicalTableConfig(getLogicalTableName());
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(logicalTableConfig);
+    JsonNode response = postQuery(starQuery);
+    JsonNode exceptions = response.get("exceptions");
+    if (!exceptions.isEmpty()) {
+      int errorCode = exceptions.get(0).get("errorCode").asInt();
+      assertTrue(errorCode == QueryErrorCode.BROKER_TIMEOUT.getId()
+          || errorCode == QueryErrorCode.SERVER_NOT_RESPONDING.getId()
+          || errorCode == QueryErrorCode.QUERY_SCHEDULING_TIMEOUT.getId(),
+          "Unexpected error code: " + errorCode);
+    }
+
+    // Query succeeds with a high limit.
+    queryConfig = new QueryConfig(1000000L, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+
+    // Reset to null.
+    queryConfig = new QueryConfig(null, null, null, null, null, null);
+    logicalTableConfig.setQueryConfig(queryConfig);
+    updateLogicalTableConfig(logicalTableConfig);
+    response = postQuery(starQuery);
+    exceptions = response.get("exceptions");
+    assertTrue(exceptions.isEmpty(), "Query should not throw exception");
+  }
+
+  @Override
+  protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+    TableConfig tableConfig = super.createRealtimeTableConfig(sampleAvroFile);
+    int tableIndex = _realtimeTableConfigIndex++;
+    String tableName = REALTIME_TABLE_NAMES.get(tableIndex);
+    Integer partitionId = REALTIME_TABLE_PARTITIONS.get(tableName);
+    tableConfig.setTableName(tableName);
+    Map<String, String> streamConfigs = new 
HashMap<>(tableConfig.getIndexingConfig().getStreamConfigs());
+    streamConfigs.put("stream.kafka.partition.ids", 
String.valueOf(partitionId));
+
+    tableConfig.getIndexingConfig().setStreamConfigs(null);
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(List.of(streamConfigs)));
+    tableConfig.setIngestionConfig(ingestionConfig);
+    return tableConfig;
+  }
+
+  @Override
+  protected void pushAvroIntoKafka(List<File> avroFiles)
+      throws Exception {
+    Properties producerProperties = new Properties();
+    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + getKafkaPort());
+    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+    String tableName = REALTIME_TABLE_NAMES.get(_kafkaPushIndex);
+    int partition = REALTIME_TABLE_PARTITIONS.get(tableName);
+    _kafkaPushIndex++;
+
+    long recordCount = 0;
+    long keySequence = 0;
+    byte[] kafkaMessageHeader = getKafkaMessageHeader();
+
+    try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProperties);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) 
{
+
+      for (File avroFile : avroFiles) {
+        try (DataFileStream<GenericRecord> dataFileStream = 
AvroUtils.getAvroReader(avroFile)) {
+          GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(dataFileStream.getSchema());
+          for (GenericRecord genericRecord : dataFileStream) {
+            outputStream.reset();
+            if (kafkaMessageHeader != null && kafkaMessageHeader.length > 0) {
+              outputStream.write(kafkaMessageHeader);
+            }
+
+            BinaryEncoder binaryEncoder = new 
EncoderFactory().directBinaryEncoder(outputStream, null);
+            datumWriter.write(genericRecord, binaryEncoder);
+            binaryEncoder.flush();
+
+            producer.send(new ProducerRecord<>(
+                getKafkaTopic(), partition, Longs.toByteArray(keySequence++), 
outputStream.toByteArray()))
+                .get();
+            recordCount++;
+          }
+        }
+      }
+    }
+
+    if (partition == 0) {
+      _table0RecordCount = recordCount;
+    } else if (partition == 1) {
+      _table1RecordCount = recordCount;
+    } else {
+      throw new IllegalStateException("Unexpected partition: " + partition);
+    }
+  }
+
+  private void waitForRecordCounts() {
+    String realtimeTableName0 = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_0);
+    String realtimeTableName1 = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME_1);
+    TestUtils.waitForCondition(ignored -> {
+      _table0RecordCount = getCurrentCountStarResult(realtimeTableName0);
+      _table1RecordCount = getCurrentCountStarResult(realtimeTableName1);
+      return getCurrentCountStarResult(LOGICAL_TABLE_NAME) == 
_table0RecordCount + _table1RecordCount;
+    }, 100L, DOCS_LOADED_TIMEOUT_MS,
+        "Failed to load the expected record counts for realtime logical 
tables");
+
+    Assert.assertEquals(getCurrentCountStarResult(realtimeTableName0), 
_table0RecordCount);
+    Assert.assertEquals(getCurrentCountStarResult(realtimeTableName1), 
_table1RecordCount);
+    Assert.assertEquals(getCurrentCountStarResult(LOGICAL_TABLE_NAME), 
_table0RecordCount + _table1RecordCount);
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 4abc9f21919..18dcd125eab 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -26,6 +26,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,11 +41,16 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionSubsetUtils;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -58,6 +64,11 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     implements StreamMetadataProvider {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+  /**
+   * Immutable partition ID subset from table config. Read once at 
construction; does not change during the
+   * provider's lifetime. Empty when no subset is configured (consume all 
partitions).
+   */
+  private final List<Integer> _partitionIdSubset;
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
     this(clientId, streamConfig, Integer.MIN_VALUE);
@@ -65,6 +76,14 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
+    List<Integer> subset =
+        
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+    if (subset != null) {
+      _partitionIdSubset = Collections.unmodifiableList(subset);
+      validatePartitionIds(_partitionIdSubset);
+    } else {
+      _partitionIdSubset = List.of();
+    }
   }
 
   @Override
@@ -75,7 +94,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
         return partitionInfos.size();
       }
       throw new TransientConsumerException(new RuntimeException(
-          String.format("Failed to fetch partition information for topic: %s", 
_topic)));
+          "Failed to fetch partition information for topic: " + _topic));
     } catch (TimeoutException e) {
       throw new TransientConsumerException(e);
     }
@@ -87,7 +106,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
       List<PartitionInfo> partitionInfos = fetchPartitionInfos(timeoutMillis);
       if (CollectionUtils.isEmpty(partitionInfos)) {
         throw new TransientConsumerException(new RuntimeException(
-            String.format("Failed to fetch partition information for topic: 
%s", _topic)));
+            "Failed to fetch partition information for topic: " + _topic));
       }
       Set<Integer> partitionIds = 
Sets.newHashSetWithExpectedSize(partitionInfos.size());
       for (PartitionInfo partitionInfo : partitionInfos) {
@@ -99,6 +118,39 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis)
+      throws IOException, java.util.concurrent.TimeoutException {
+    if (_partitionIdSubset.isEmpty()) {
+      return 
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId, 
streamConfig,
+          partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+    Map<Integer, StreamPartitionMsgOffset> partitionIdToEndOffset =
+        new HashMap<>(partitionGroupConsumptionStatuses.size());
+    for (PartitionGroupConsumptionStatus s : 
partitionGroupConsumptionStatuses) {
+      partitionIdToEndOffset.put(s.getStreamPartitionGroupId(), 
s.getEndOffset());
+    }
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> result = new 
ArrayList<>(_partitionIdSubset.size());
+    for (Integer partitionId : _partitionIdSubset) {
+      StreamPartitionMsgOffset endOffset = 
partitionIdToEndOffset.get(partitionId);
+      StreamPartitionMsgOffset startOffset;
+      if (endOffset == null) {
+        try (StreamMetadataProvider partitionMetadataProvider =
+            streamConsumerFactory.createPartitionMetadataProvider(
+                StreamConsumerFactory.getUniqueClientId(clientId), 
partitionId)) {
+          startOffset = partitionMetadataProvider.fetchStreamPartitionOffset(
+              streamConfig.getOffsetCriteria(), timeoutMillis);
+        }
+      } else {
+        startOffset = endOffset;
+      }
+      result.add(new PartitionGroupMetadata(partitionId, startOffset));
+    }
+    return result;
+  }
+
   @Override
   public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
     List<TopicPartition> topicPartitions = new 
ArrayList<>(partitionIds.size());
@@ -309,7 +361,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
       try {
         if (!topicExists(requestTimeoutMs)) {
           topicMissing = true;
-          lastError = new RuntimeException(String.format("Topic does not 
exist: %s", _topic));
+          lastError = new RuntimeException("Topic does not exist: " + _topic);
         } else {
           topicMissing = false;
         }
@@ -332,7 +384,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
     if (lastError != null) {
       if (topicMissing) {
-        throw new RuntimeException(String.format("Topic does not exist: %s", 
_topic));
+        throw new RuntimeException("Topic does not exist: " + _topic);
       }
       if (lastError instanceof TransientConsumerException) {
         throw (TransientConsumerException) lastError;
@@ -343,7 +395,30 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
 
     throw new TransientConsumerException(
-        new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic)));
+        new RuntimeException("Failed to fetch partition information for topic: 
" + _topic));
+  }
+
+  private void validatePartitionIds(List<Integer> subset) {
+    Set<Integer> topicPartitionIds = new HashSet<>();
+    List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+    if (partitionInfos == null || partitionInfos.isEmpty()) {
+      throw new IllegalStateException(
+          "Cannot validate partition IDs: topic " + _topic + " metadata not 
available. "
+              + "Ensure the topic exists and is accessible.");
+    }
+    for (PartitionInfo partitionInfo : partitionInfos) {
+      topicPartitionIds.add(partitionInfo.partition());
+    }
+    List<Integer> missingPartitionIds = new ArrayList<>();
+    for (Integer partitionId : subset) {
+      if (!topicPartitionIds.contains(partitionId)) {
+        missingPartitionIds.add(partitionId);
+      }
+    }
+    Preconditions.checkArgument(
+        missingPartitionIds.isEmpty(),
+        "Invalid partition ids %s for table stream config. Available 
partitions on topic %s are: %s",
+        missingPartitionIds, _topic, topicPartitionIds);
   }
 
   private boolean topicExists(long timeoutMillis) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index 3f554b91f07..38423b0de4a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -79,7 +79,9 @@ public class KafkaPartitionLevelConsumerTest {
   private static final String TEST_TOPIC_1 = "foo";
   private static final String TEST_TOPIC_2 = "bar";
   private static final String TEST_TOPIC_3 = "expired";
+  private static final String TEST_TOPIC_SUBSET_PARTITION = "subsetPartition";
   private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+  private static final int NUM_MSG_PRODUCED_PER_SUBSET_PARTITION = 100;
   private static final long TIMESTAMP = Instant.now().toEpochMilli();
   private static final Random RANDOM = new Random();
 
@@ -99,6 +101,7 @@ public class KafkaPartitionLevelConsumerTest {
     _kafkaCluster.createTopic(TEST_TOPIC_1, 1);
     _kafkaCluster.createTopic(TEST_TOPIC_2, 2);
     _kafkaCluster.createTopic(TEST_TOPIC_3, 1);
+    _kafkaCluster.createTopic(TEST_TOPIC_SUBSET_PARTITION, 8);
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
     produceMsgToKafka();
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
@@ -119,6 +122,13 @@ public class KafkaPartitionLevelConsumerTest {
         producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i, 
null, "sample_msg_" + i));
         producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
       }
+      for (int partitionId = 0; partitionId < 8; partitionId++) {
+        for (int msgIdx = 0; msgIdx < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; 
msgIdx++) {
+          String payload = "mod_" + partitionId + "_msg_" + msgIdx;
+          producer.send(new ProducerRecord<>(TEST_TOPIC_SUBSET_PARTITION, 
partitionId,
+              TIMESTAMP + (partitionId * 1000L) + msgIdx, 
Integer.toString(partitionId), payload));
+        }
+      }
       producer.flush();
     }
   }
@@ -130,6 +140,7 @@ public class KafkaPartitionLevelConsumerTest {
       _kafkaCluster.deleteTopic(TEST_TOPIC_1);
       _kafkaCluster.deleteTopic(TEST_TOPIC_2);
       _kafkaCluster.deleteTopic(TEST_TOPIC_3);
+      _kafkaCluster.deleteTopic(TEST_TOPIC_SUBSET_PARTITION);
     } finally {
       _kafkaCluster.stop();
     }
@@ -206,6 +217,71 @@ public class KafkaPartitionLevelConsumerTest {
     assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
   }
 
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSubsetPartitionInvalidPartition() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "invalid-subset-client";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+        KafkaStreamConfigProperties.PARTITION_IDS), "99");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    new KafkaStreamMetadataProvider(clientId, streamConfig);
+  }
+
+  @Test
+  public void testSubsetPartitionConsumption()
+      throws TimeoutException {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String tableNameWithType = "tableName_REALTIME";
+
+    for (int partitionId = 0; partitionId < 8; partitionId++) {
+      String clientId = "subset-client-" + partitionId;
+      Map<String, String> streamConfigMap = new HashMap<>();
+      streamConfigMap.put("streamType", streamType);
+      streamConfigMap.put("stream.kafka.topic.name", 
TEST_TOPIC_SUBSET_PARTITION);
+      streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+      streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+      streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+      streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+          KafkaStreamConfigProperties.PARTITION_IDS), 
Integer.toString(partitionId));
+      StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+      KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider(clientId, streamConfig);
+      // With subset partition config, fetchPartitionCount/fetchPartitionIds 
still return the total Kafka
+      // partition count and all partition IDs; subset filtering only applies 
to segment creation.
+      assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 8);
+      assertEquals(streamMetadataProvider.fetchPartitionIds(10000L),
+          new HashSet<>(List.of(0, 1, 2, 3, 4, 5, 6, 7)));
+
+      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+      PartitionGroupConsumer consumer = 
streamConsumerFactory.createPartitionGroupConsumer(
+          clientId,
+          new PartitionGroupConsumptionStatus(partitionId, 0, new 
LongMsgOffset(0),
+              new LongMsgOffset(NUM_MSG_PRODUCED_PER_SUBSET_PARTITION), 
"CONSUMING"));
+
+      MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0), 
10000);
+      assertEquals(messageBatch.getMessageCount(), 
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+      for (int i = 0; i < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), "mod_" + 
partitionId + "_msg_" + i);
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(((LongMsgOffset) metadata.getOffset()).getOffset(), i);
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 
(partitionId * 1000L) + i);
+      }
+    }
+  }
+
   @Test
   public void testFetchMessages() {
     String streamType = "kafka";
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
index 9eb73deda63..bd712ac6fb6 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -26,6 +26,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,11 +41,16 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.plugin.stream.kafka.KafkaConsumerPartitionLag;
+import org.apache.pinot.plugin.stream.kafka.KafkaPartitionSubsetUtils;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -58,6 +64,11 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     implements StreamMetadataProvider {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+  /**
+   * Immutable partition ID subset from table config. Read once at 
construction; does not change during the
+   * provider's lifetime. Empty when no subset is configured (consume all 
partitions).
+   */
+  private final List<Integer> _partitionIdSubset;
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
     this(clientId, streamConfig, Integer.MIN_VALUE);
@@ -65,6 +76,14 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
+    List<Integer> subset =
+        
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(_config.getStreamConfigMap());
+    if (subset != null) {
+      _partitionIdSubset = Collections.unmodifiableList(subset);
+      validatePartitionIds(_partitionIdSubset);
+    } else {
+      _partitionIdSubset = List.of();
+    }
   }
 
   @Override
@@ -75,7 +94,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
         return partitionInfos.size();
       }
       throw new TransientConsumerException(new RuntimeException(
-          String.format("Failed to fetch partition information for topic: %s", 
_topic)));
+          "Failed to fetch partition information for topic: " + _topic));
     } catch (TimeoutException e) {
       throw new TransientConsumerException(e);
     }
@@ -87,7 +106,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
       List<PartitionInfo> partitionInfos = fetchPartitionInfos(timeoutMillis);
       if (CollectionUtils.isEmpty(partitionInfos)) {
         throw new TransientConsumerException(new RuntimeException(
-            String.format("Failed to fetch partition information for topic: 
%s", _topic)));
+            "Failed to fetch partition information for topic: " + _topic));
       }
       Set<Integer> partitionIds = 
Sets.newHashSetWithExpectedSize(partitionInfos.size());
       for (PartitionInfo partitionInfo : partitionInfos) {
@@ -99,6 +118,39 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis)
+      throws IOException, java.util.concurrent.TimeoutException {
+    if (_partitionIdSubset.isEmpty()) {
+      return 
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId, 
streamConfig,
+          partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+    Map<Integer, StreamPartitionMsgOffset> partitionIdToEndOffset =
+        new HashMap<>(partitionGroupConsumptionStatuses.size());
+    for (PartitionGroupConsumptionStatus s : 
partitionGroupConsumptionStatuses) {
+      partitionIdToEndOffset.put(s.getStreamPartitionGroupId(), 
s.getEndOffset());
+    }
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> result = new 
ArrayList<>(_partitionIdSubset.size());
+    for (Integer partitionId : _partitionIdSubset) {
+      StreamPartitionMsgOffset endOffset = 
partitionIdToEndOffset.get(partitionId);
+      StreamPartitionMsgOffset startOffset;
+      if (endOffset == null) {
+        try (StreamMetadataProvider partitionMetadataProvider =
+            streamConsumerFactory.createPartitionMetadataProvider(
+                StreamConsumerFactory.getUniqueClientId(clientId), 
partitionId)) {
+          startOffset = partitionMetadataProvider.fetchStreamPartitionOffset(
+              streamConfig.getOffsetCriteria(), timeoutMillis);
+        }
+      } else {
+        startOffset = endOffset;
+      }
+      result.add(new PartitionGroupMetadata(partitionId, startOffset));
+    }
+    return result;
+  }
+
   @Override
   public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
     List<TopicPartition> topicPartitions = new 
ArrayList<>(partitionIds.size());
@@ -307,7 +359,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
       try {
         if (!topicExists(requestTimeoutMs)) {
           topicMissing = true;
-          lastError = new RuntimeException(String.format("Topic does not 
exist: %s", _topic));
+          lastError = new RuntimeException("Topic does not exist: " + _topic);
         } else {
           topicMissing = false;
         }
@@ -330,7 +382,7 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
 
     if (lastError != null) {
       if (topicMissing) {
-        throw new RuntimeException(String.format("Topic does not exist: %s", 
_topic));
+        throw new RuntimeException("Topic does not exist: " + _topic);
       }
       if (lastError instanceof TransientConsumerException) {
         throw (TransientConsumerException) lastError;
@@ -341,7 +393,30 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
 
     throw new TransientConsumerException(
-        new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic)));
+        new RuntimeException("Failed to fetch partition information for topic: 
" + _topic));
+  }
+
+  private void validatePartitionIds(List<Integer> subset) {
+    Set<Integer> topicPartitionIds = new HashSet<>();
+    List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+    if (partitionInfos == null || partitionInfos.isEmpty()) {
+      throw new IllegalStateException(
+          "Cannot validate partition IDs: topic " + _topic + " metadata not 
available. "
+              + "Ensure the topic exists and is accessible.");
+    }
+    for (PartitionInfo partitionInfo : partitionInfos) {
+      topicPartitionIds.add(partitionInfo.partition());
+    }
+    List<Integer> missingPartitionIds = new ArrayList<>();
+    for (Integer partitionId : subset) {
+      if (!topicPartitionIds.contains(partitionId)) {
+        missingPartitionIds.add(partitionId);
+      }
+    }
+    Preconditions.checkArgument(
+        missingPartitionIds.isEmpty(),
+        "Invalid partition ids %s for table stream config. Available 
partitions on topic %s are: %s",
+        missingPartitionIds, _topic, topicPartitionIds);
   }
 
   private boolean topicExists(long timeoutMillis) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
index f59c7a635a7..34852964e6e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java
@@ -58,11 +58,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.DockerClientFactory;
 import org.testng.Assert;
-import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -77,15 +73,15 @@ import static org.testng.Assert.assertTrue;
 
 /**
  * Tests for the KafkaPartitionLevelConsumer.
- * Note: These tests require Docker to be running as they use Testcontainers.
  */
 public class KafkaPartitionLevelConsumerTest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
   private static final long STABILIZE_SLEEP_DELAYS = 3000;
   private static final String TEST_TOPIC_1 = "foo";
   private static final String TEST_TOPIC_2 = "bar";
   private static final String TEST_TOPIC_3 = "expired";
+  private static final String TEST_TOPIC_SUBSET_PARTITION = "subsetPartition";
   private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
+  private static final int NUM_MSG_PRODUCED_PER_SUBSET_PARTITION = 100;
   private static final long TIMESTAMP = Instant.now().toEpochMilli();
   private static final Random RANDOM = new Random();
 
@@ -95,37 +91,19 @@ public class KafkaPartitionLevelConsumerTest {
   @BeforeClass
   public void setUp()
       throws Exception {
-    // Check if Docker is available, skip tests if not
-    if (!isDockerAvailable()) {
-      throw new SkipException("Docker is not available. Skipping Kafka 4.0 
consumer tests. "
-          + "These tests require Docker for Testcontainers.");
-    }
     _kafkaCluster = new MiniKafkaCluster("0");
     _kafkaCluster.start();
     _kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
     _kafkaCluster.createTopic(TEST_TOPIC_1, 1);
     _kafkaCluster.createTopic(TEST_TOPIC_2, 2);
     _kafkaCluster.createTopic(TEST_TOPIC_3, 1);
+    _kafkaCluster.createTopic(TEST_TOPIC_SUBSET_PARTITION, 8);
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
     produceMsgToKafka();
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
     _kafkaCluster.deleteRecordsBeforeOffset(TEST_TOPIC_3, 0, 200);
   }
 
-  /**
-   * Checks if Docker is available for running Testcontainers.
-   * @return true if Docker is available, false otherwise
-   */
-  private static boolean isDockerAvailable() {
-    try {
-      DockerClientFactory.instance().client();
-      return true;
-    } catch (Throwable ex) {
-      LOGGER.warn("Docker is not available: {}", ex.getMessage());
-      return false;
-    }
-  }
-
   private void produceMsgToKafka() {
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
@@ -140,6 +118,13 @@ public class KafkaPartitionLevelConsumerTest {
         producer.send(new ProducerRecord<>(TEST_TOPIC_2, 1, TIMESTAMP + i, 
null, "sample_msg_" + i));
         producer.send(new ProducerRecord<>(TEST_TOPIC_3, "sample_msg_" + i));
       }
+      for (int partitionId = 0; partitionId < 8; partitionId++) {
+        for (int msgIdx = 0; msgIdx < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; 
msgIdx++) {
+          String payload = "mod_" + partitionId + "_msg_" + msgIdx;
+          producer.send(new ProducerRecord<>(TEST_TOPIC_SUBSET_PARTITION, 
partitionId,
+              TIMESTAMP + (partitionId * 1000L) + msgIdx, 
Integer.toString(partitionId), payload));
+        }
+      }
       producer.flush();
     }
   }
@@ -151,6 +136,7 @@ public class KafkaPartitionLevelConsumerTest {
       _kafkaCluster.deleteTopic(TEST_TOPIC_1);
       _kafkaCluster.deleteTopic(TEST_TOPIC_2);
       _kafkaCluster.deleteTopic(TEST_TOPIC_3);
+      _kafkaCluster.deleteTopic(TEST_TOPIC_SUBSET_PARTITION);
     } finally {
       _kafkaCluster.stop();
     }
@@ -225,6 +211,79 @@ public class KafkaPartitionLevelConsumerTest {
 
     streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, 
streamConfig);
     assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+
+    streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+        KafkaStreamConfigProperties.PARTITION_IDS), "1");
+    streamConfig = new StreamConfig(tableNameWithType, streamConfigMap);
+    streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, 
streamConfig);
+    // With subset partition config, fetchPartitionCount/fetchPartitionIds 
still return the total Kafka
+    // partition count and all partition IDs; subset filtering only applies to 
segment creation.
+    assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+    assertEquals(streamMetadataProvider.fetchPartitionIds(10000L), new 
HashSet<>(List.of(0, 1)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSubsetPartitionInvalidPartition() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String clientId = "invalid-subset-client";
+    String tableNameWithType = "tableName_REALTIME";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+        KafkaStreamConfigProperties.PARTITION_IDS), "99");
+    StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+    new KafkaStreamMetadataProvider(clientId, streamConfig);
+  }
+
+  @Test
+  public void testSubsetPartitionConsumption() throws TimeoutException {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
+    String tableNameWithType = "tableName_REALTIME";
+
+    for (int partitionId = 0; partitionId < 8; partitionId++) {
+      String clientId = "subset-client-" + partitionId;
+      Map<String, String> streamConfigMap = new HashMap<>();
+      streamConfigMap.put("streamType", streamType);
+      streamConfigMap.put("stream.kafka.topic.name", 
TEST_TOPIC_SUBSET_PARTITION);
+      streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+      streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
+      streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+      streamConfigMap.put(KafkaStreamConfigProperties.constructStreamProperty(
+          KafkaStreamConfigProperties.PARTITION_IDS), 
Integer.toString(partitionId));
+      StreamConfig streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMap);
+
+      KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider(clientId, streamConfig);
+      // With subset partition config, fetchPartitionCount/fetchPartitionIds 
still return the total Kafka
+      // partition count and all partition IDs; subset filtering only applies 
to segment creation.
+      assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 8);
+      assertEquals(streamMetadataProvider.fetchPartitionIds(10000L),
+          new HashSet<>(List.of(0, 1, 2, 3, 4, 5, 6, 7)));
+
+      StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+      PartitionGroupConsumer consumer = 
streamConsumerFactory.createPartitionGroupConsumer(
+          clientId,
+          new PartitionGroupConsumptionStatus(partitionId, 0, new 
LongMsgOffset(0),
+              new LongMsgOffset(NUM_MSG_PRODUCED_PER_SUBSET_PARTITION), 
"CONSUMING"));
+
+      MessageBatch messageBatch = consumer.fetchMessages(new LongMsgOffset(0), 
10000);
+      assertEquals(messageBatch.getMessageCount(), 
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+      assertEquals(messageBatch.getUnfilteredMessageCount(), 
NUM_MSG_PRODUCED_PER_SUBSET_PARTITION);
+      for (int i = 0; i < NUM_MSG_PRODUCED_PER_SUBSET_PARTITION; i++) {
+        StreamMessage streamMessage = messageBatch.getStreamMessage(i);
+        assertEquals(new String((byte[]) streamMessage.getValue()), "mod_" + 
partitionId + "_msg_" + i);
+        StreamMessageMetadata metadata = streamMessage.getMetadata();
+        assertEquals(((LongMsgOffset) metadata.getOffset()).getOffset(), i);
+        assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 
(partitionId * 1000L) + i);
+      }
+    }
   }
 
   @Test
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
index 7202e10c472..18e171b974a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.stream.kafka;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -120,6 +121,14 @@ public class KafkaPartitionLevelStreamConfig {
     return _populateMetadata;
   }
 
+  /**
+   * Returns an unmodifiable view of the stream config map so callers cannot 
mutate it
+   * and change consumer behavior after construction.
+   */
+  public Map<String, String> getStreamConfigMap() {
+    return Collections.unmodifiableMap(_streamConfigMap);
+  }
+
   private int getIntConfigWithDefault(Map<String, String> configMap, String 
key, int defaultValue) {
     String stringValue = configMap.get(key);
     try {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
new file mode 100644
index 00000000000..e9fc7ad69e1
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Utilities for parsing and validating Kafka partition subset configuration
+ * (stream.kafka.partition.ids) from stream config.
+ */
+public final class KafkaPartitionSubsetUtils {
+
+  private KafkaPartitionSubsetUtils() {
+  }
+
+  /**
+   * Reads the optional comma-separated partition ID list from the stream 
config map.
+   * Returns a sorted, deduplicated list for stable ordering when used for 
partition group metadata.
+   * Duplicate IDs in the config are silently removed; this ensures stable 
ordering and prevents
+   * duplicate processing of the same partition.
+   *
+   * @param streamConfigMap table stream config map (e.g. from
+   *                        {@link 
org.apache.pinot.spi.stream.StreamConfig#getStreamConfigsMap()})
+   * @return Sorted list of unique partition IDs when 
stream.kafka.partition.ids is set and non-empty;
+   *         null when not set or blank
+   * @throws IllegalArgumentException if the value contains invalid 
(non-integer) entries
+   */
+  @Nullable
+  public static List<Integer> getPartitionIdsFromConfig(Map<String, String> 
streamConfigMap) {
+    String key = 
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS);
+    String value = streamConfigMap.get(key);
+    if (StringUtils.isBlank(value)) {
+      return null;
+    }
+    String[] parts = value.split(",");
+    Set<Integer> idSet = new HashSet<>();
+    for (String part : parts) {
+      String trimmed = part.trim();
+      if (trimmed.isEmpty()) {
+        continue;
+      }
+      try {
+        int partitionId = Integer.parseInt(trimmed);
+        if (partitionId < 0) {
+          throw new IllegalArgumentException("Invalid " + key
+              + " value: partition IDs must be non-negative, got '" + value + 
"'");
+        }
+        idSet.add(partitionId);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Invalid " + key + " value: expected comma-separated integers, got 
'" + value + "'", e);
+      }
+    }
+    if (idSet.isEmpty()) {
+      return null;
+    }
+    List<Integer> ids = new ArrayList<>(idSet);
+    Collections.sort(ids);
+    return ids;
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
index 477bbf56418..1d7fa19de84 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaStreamConfigProperties.java
@@ -68,4 +68,10 @@ public class KafkaStreamConfigProperties {
   }
 
   public static final String KAFKA_CONSUMER_PROP_PREFIX = 
"kafka.consumer.prop";
+
+  /**
+   * Optional comma-separated list of Kafka partition IDs to consume (e.g. 
"0,2,5").
+   * When set, only these partitions are used for the table; when absent, all 
topic partitions are consumed.
+   */
+  public static final String PARTITION_IDS = "kafka.partition.ids";
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
new file mode 100644
index 00000000000..9b262fab056
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionSubsetUtilsTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionSubsetUtilsTest {
+
+  @Test
+  public void testGetPartitionIdsFromConfigMissingKey() {
+    Map<String, String> config = new HashMap<>();
+    config.put("stream.kafka.topic.name", "myTopic");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigBlankValue() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
 "  ");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigEmptyAfterTrim() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
 ",");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigValidSubset() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "0,2,5");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 2, 5));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigUnsortedReturnsSorted() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "5, 2 , 0");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 2, 5));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetPartitionIdsFromConfigInvalidNumber() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "0,abc,1");
+    KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetPartitionIdsFromConfigInvalidNumberOnly() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "not_a_number");
+    KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetPartitionIdsFromConfigNegativePartitionId() {
+    Map<String, String> config = new HashMap<>();
+    config.put(
+        
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "-1,0,1");
+    KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigSinglePartition() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
 "3");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(3));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigEmptyMap() {
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(Collections.emptyMap());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigDedupesDuplicates() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "2,0,2,5,0,5");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 2, 5));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigMultipleCommasWithWhitespace() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "  ,  ,  0  ,  ,  ");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigLeadingAndTrailingCommas() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        ",,,0,1,2,,,");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 1, 2));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigAllEmptyCommas() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        " , , , , ");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigVeryLargePartitionId() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "999,1000,9999");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(999, 1000, 9999));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigAllPartitionsInSubset() {
+    // Simulates a subset config that contains all partitions (e.g., 0,1,2,3 
for a 4-partition topic)
+    // This is valid config-wise but might not be useful in practice
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "0,1,2,3");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 1, 2, 3));
+  }
+
+  @Test
+  public void testGetPartitionIdsFromConfigMixedWhitespace() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        " 5 , 2 ,\t0\t,\n3\n");
+    List<Integer> result = 
KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, List.of(0, 2, 3, 5));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetPartitionIdsFromConfigPartiallyInvalid() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "0,1,2,invalid,3");
+    KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetPartitionIdsFromConfigNegativeInMiddle() {
+    Map<String, String> config = new HashMap<>();
+    
config.put(KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.PARTITION_IDS),
+        "0,1,-5,2");
+    KafkaPartitionSubsetUtils.getPartitionIdsFromConfig(config);
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index cbd8cd98428..15197294bd4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -25,6 +25,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
@@ -35,12 +37,23 @@ import java.util.Properties;
 import java.util.zip.GZIPInputStream;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.entity.mime.FileBody;
+import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpEntity;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.admin.command.AbstractBaseAdminCommand;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -81,17 +94,19 @@ public abstract class QuickStartBase {
       "examples/batch/testUnnest",
   };
 
-  protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= Map.of(
-      "airlineStats", "examples/stream/airlineStats",
-      "dailySales", "examples/stream/dailySales",
-      "githubEvents", "examples/stream/githubEvents",
-      "meetupRsvp", "examples/stream/meetupRsvp",
-      "meetupRsvpJson", "examples/stream/meetupRsvpJson",
-      "meetupRsvpComplexType", "examples/stream/meetupRsvpComplexType",
-      "upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp",
-      "upsertJsonMeetupRsvp", "examples/stream/upsertJsonMeetupRsvp",
-      "upsertPartialMeetupRsvp", "examples/stream/upsertPartialMeetupRsvp",
-      "fineFoodReviews", "examples/stream/fineFoodReviews");
+  protected static final Map<String, String> DEFAULT_STREAM_TABLE_DIRECTORIES 
= Map.ofEntries(
+      Map.entry("airlineStats", "examples/stream/airlineStats"),
+      Map.entry("dailySales", "examples/stream/dailySales"),
+      Map.entry("githubEvents", "examples/stream/githubEvents"),
+      Map.entry("meetupRsvp", "examples/stream/meetupRsvp"),
+      Map.entry("meetupRsvpJson", "examples/stream/meetupRsvpJson"),
+      Map.entry("meetupRsvpComplexType", 
"examples/stream/meetupRsvpComplexType"),
+      Map.entry("upsertMeetupRsvp", "examples/stream/upsertMeetupRsvp"),
+      Map.entry("upsertJsonMeetupRsvp", 
"examples/stream/upsertJsonMeetupRsvp"),
+      Map.entry("upsertPartialMeetupRsvp", 
"examples/stream/upsertPartialMeetupRsvp"),
+      Map.entry("fineFoodReviews", "examples/stream/fineFoodReviews"),
+      Map.entry("fineFoodReviews_part_0", 
"examples/stream/fineFoodReviews_part_0"),
+      Map.entry("fineFoodReviews_part_1", 
"examples/stream/fineFoodReviews_part_1"));
 
   protected File _dataDir = FileUtils.getTempDirectory();
   protected boolean _setCustomDataDir;
@@ -566,6 +581,10 @@ public abstract class QuickStartBase {
               "***** Starting fineFoodReviews data stream and publishing to 
Kafka *****");
           publishStreamDataToKafka("fineFoodReviews", new 
File(quickstartTmpDir, "fineFoodReviews"));
           break;
+        case "fineFoodReviews_part_0":
+        case "fineFoodReviews_part_1":
+          // Consume from existing fineFoodReviews topic (partition subset); 
no separate stream to start
+          break;
         default:
           throw new UnsupportedOperationException("Unknown stream name: " + 
streamName);
       }
@@ -1023,4 +1042,67 @@ public abstract class QuickStartBase {
     }
     return null;
   }
+
+  protected void createFineFoodReviewsFederatedTable() {
+    if (!useDefaultBootstrapTableDir()) {
+      return;
+    }
+    Map<String, String> streamTableDirectories = 
getDefaultStreamTableDirectories();
+    if (!streamTableDirectories.containsKey("fineFoodReviews_part_0")
+        || !streamTableDirectories.containsKey("fineFoodReviews_part_1")) {
+      return;
+    }
+    String logicalTableName = "fineFoodReviews-federated";
+    try {
+      Schema schema = 
loadSchemaFromResource("/examples/stream/fineFoodReviews/fineFoodReviews_schema.json");
+      schema.setSchemaName(logicalTableName);
+      createSchemaOnController(schema, logicalTableName);
+
+      LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+      logicalTableConfig.setTableName(logicalTableName);
+      logicalTableConfig.setBrokerTenant("DefaultTenant");
+      
logicalTableConfig.setRefRealtimeTableName("fineFoodReviews_part_0_REALTIME");
+      logicalTableConfig.setPhysicalTableConfigMap(Map.of(
+          "fineFoodReviews_part_0_REALTIME", new PhysicalTableConfig(),
+          "fineFoodReviews_part_1_REALTIME", new PhysicalTableConfig()
+      ));
+
+      String logicalTableUrl = "http://localhost:"; + 
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/logicalTables";
+      AbstractBaseAdminCommand.sendPostRequest(logicalTableUrl, 
logicalTableConfig.toSingleLineJsonString());
+      printStatus(Quickstart.Color.GREEN,
+          "***** Logical table fineFoodReviews-federated created successfully 
*****");
+    } catch (Exception e) {
+      printStatus(Quickstart.Color.YELLOW,
+          "***** Logical table fineFoodReviews-federated creation failed: " + 
e.getMessage() + " *****");
+    }
+  }
+
+  private Schema loadSchemaFromResource(String resourcePath)
+      throws IOException {
+    try (InputStream inputStream = 
getClass().getResourceAsStream(resourcePath)) {
+      if (inputStream == null) {
+        throw new IOException("Schema file not found: " + resourcePath);
+      }
+      String schemaJsonString = IOUtils.toString(inputStream, 
StandardCharsets.UTF_8);
+      return Schema.fromString(schemaJsonString);
+    }
+  }
+
+  private void createSchemaOnController(Schema schema, String logicalTableName)
+      throws Exception {
+    File tempSchemaFile = File.createTempFile(logicalTableName + "_schema", 
".json");
+    tempSchemaFile.deleteOnExit();
+    FileUtils.writeStringToFile(tempSchemaFile, 
schema.toSingleLineJsonString(), StandardCharsets.UTF_8);
+    HttpEntity multipartEntity = MultipartEntityBuilder.create()
+        .addPart("schema", new FileBody(tempSchemaFile, 
ContentType.APPLICATION_JSON, logicalTableName + ".json"))
+        .build();
+    try (HttpClient httpClient = new HttpClient()) {
+      String schemaUrl = "http://localhost:"; + 
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/schemas?override=true"
+          + "&force=true";
+      SimpleHttpResponse response = 
httpClient.sendPostRequest(URI.create(schemaUrl), multipartEntity, null, null);
+      if (response.getStatusCode() != 200) {
+        throw new RuntimeException("Schema creation response: " + 
response.getResponse());
+      }
+    }
+  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 2382353462d..21e69d28380 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -88,6 +88,30 @@ public class RealtimeQuickStart extends QuickStartBase {
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q5)));
     printStatus(Color.GREEN, 
"***************************************************");
 
+    String q6 = "select count(*) from fineFoodReviews";
+    printStatus(Color.YELLOW, "Total number of documents in fineFoodReviews");
+    printStatus(Color.CYAN, "Query : " + q6);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q6)));
+    printStatus(Color.GREEN, 
"***************************************************");
+
+    String q7 = "select count(*) from \"fineFoodReviews-federated\"";
+    printStatus(Color.YELLOW, "Total number of documents in 
fineFoodReviews-federated");
+    printStatus(Color.CYAN, "Query : " + q7);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q7)));
+    printStatus(Color.GREEN, 
"***************************************************");
+
+    String q8 = "select count(*) from \"fineFoodReviews_part_0\"";
+    printStatus(Color.YELLOW, "Total number of documents in 
fineFoodReviews_part_0");
+    printStatus(Color.CYAN, "Query : " + q8);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q8)));
+    printStatus(Color.GREEN, 
"***************************************************");
+
+    String q9 = "select count(*) from \"fineFoodReviews_part_1\"";
+    printStatus(Color.YELLOW, "Total number of documents in 
fineFoodReviews_part_1");
+    printStatus(Color.CYAN, "Query : " + q9);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q9)));
+    printStatus(Color.GREEN, 
"***************************************************");
+
     runVectorQueryExamples(runner);
   }
 
@@ -118,6 +142,7 @@ public class RealtimeQuickStart extends QuickStartBase {
 
     printStatus(Color.CYAN, "***** Bootstrap all tables *****");
     runner.bootstrapTable();
+    createFineFoodReviewsFederatedTable();
 
     printStatus(Color.CYAN, "***** Waiting for 5 seconds for a few events to 
get populated *****");
     Thread.sleep(5000);
diff --git 
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
new file mode 100644
index 00000000000..ba9fa20eae6
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_realtime_table_config.json
@@ -0,0 +1,79 @@
+{
+  "tableName": "fineFoodReviews_part_0",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "segmentPushType": "APPEND",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "timeColumnName": "ts",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "5",
+    "replication": "1"
+  },
+  "tenants": {
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "noDictionaryColumns": ["Text"],
+    "multiColumnTextIndexConfig": {
+      "columns": ["UserId", "ProductId", "Summary"]
+    }
+  },
+  "routing": {
+    "segmentPrunerTypes": [
+      "time"
+    ]
+  },
+  "ingestionConfig": {
+    "streamIngestionConfig": {
+      "streamConfigMaps": [
+        {
+          "streamType": "kafka",
+          "stream.kafka.topic.name": "fineFoodReviews",
+          "stream.kafka.partition.ids": "0",
+          "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+          "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
+          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+          "stream.kafka.broker.list": "localhost:19092",
+          "realtime.segment.flush.threshold.time": "3600000",
+          "realtime.segment.flush.threshold.size": "50000"
+        }
+      ]
+    },
+    "transformConfigs": [
+      {
+        "columnName": "ts",
+        "transformFunction": "now()"
+      }
+    ]
+  },
+  "metadata": {
+    "customConfigs": {
+    }
+  },
+  "fieldConfigList": [
+    {
+      "encodingType": "RAW",
+      "indexType": "VECTOR",
+      "name": "embedding",
+      "properties": {
+        "vectorIndexType": "HNSW",
+        "vectorDimension": 1536,
+        "vectorDistanceFunction": "COSINE",
+        "version": 1,
+        "commitDocs": "1"
+      }
+    },
+    {
+      "name": "Text",
+      "encodingType": "RAW",
+      "indexes": {
+        "text": {
+          "deriveNumDocsPerChunkForRawIndex": "true",
+          "rawIndexWriterVersion": "3",
+          "caseSensitive": "false"
+        }
+      }
+    }
+  ]
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
new file mode 100644
index 00000000000..f17d47176b9
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_0/fineFoodReviews_part_0_schema.json
@@ -0,0 +1,48 @@
+{
+  "metricFieldSpecs": [
+  ],
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "STRING",
+      "name": "ProductId"
+    },
+    {
+      "dataType": "STRING",
+      "name": "UserId"
+    },
+    {
+      "dataType": "INT",
+      "name": "Score"
+    },
+    {
+      "dataType": "STRING",
+      "name": "Summary"
+    },
+    {
+      "dataType": "STRING",
+      "name": "Text"
+    },
+    {
+      "dataType": "STRING",
+      "name": "combined"
+    },
+    {
+      "dataType": "INT",
+      "name": "n_tokens"
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "embedding",
+      "singleValueField": false
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "ts",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:SECONDS"
+    }
+  ],
+  "schemaName": "fineFoodReviews_part_0"
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
new file mode 100644
index 00000000000..ab4ee90fac5
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_realtime_table_config.json
@@ -0,0 +1,79 @@
+{
+  "tableName": "fineFoodReviews_part_1",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "segmentPushType": "APPEND",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "timeColumnName": "ts",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "5",
+    "replication": "1"
+  },
+  "tenants": {
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "noDictionaryColumns": ["Text"],
+    "multiColumnTextIndexConfig": {
+      "columns": ["UserId", "ProductId", "Summary"]
+    }
+  },
+  "routing": {
+    "segmentPrunerTypes": [
+      "time"
+    ]
+  },
+  "ingestionConfig": {
+    "streamIngestionConfig": {
+      "streamConfigMaps": [
+        {
+          "streamType": "kafka",
+          "stream.kafka.partition.ids": "1",
+          "stream.kafka.topic.name": "fineFoodReviews",
+          "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+          "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
+          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+          "stream.kafka.broker.list": "localhost:19092",
+          "realtime.segment.flush.threshold.time": "3600000",
+          "realtime.segment.flush.threshold.size": "50000"
+        }
+      ]
+    },
+    "transformConfigs": [
+      {
+        "columnName": "ts",
+        "transformFunction": "now()"
+      }
+    ]
+  },
+  "metadata": {
+    "customConfigs": {
+    }
+  },
+  "fieldConfigList": [
+    {
+      "encodingType": "RAW",
+      "indexType": "VECTOR",
+      "name": "embedding",
+      "properties": {
+        "vectorIndexType": "HNSW",
+        "vectorDimension": 1536,
+        "vectorDistanceFunction": "COSINE",
+        "version": 1,
+        "commitDocs": "1"
+      }
+    },
+    {
+      "name": "Text",
+      "encodingType": "RAW",
+      "indexes": {
+        "text": {
+          "deriveNumDocsPerChunkForRawIndex": "true",
+          "rawIndexWriterVersion": "3",
+          "caseSensitive": "false"
+        }
+      }
+    }
+  ]
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
new file mode 100644
index 00000000000..4162341f070
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/fineFoodReviews_part_1/fineFoodReviews_part_1_schema.json
@@ -0,0 +1,48 @@
+{
+  "metricFieldSpecs": [
+  ],
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "STRING",
+      "name": "ProductId"
+    },
+    {
+      "dataType": "STRING",
+      "name": "UserId"
+    },
+    {
+      "dataType": "INT",
+      "name": "Score"
+    },
+    {
+      "dataType": "STRING",
+      "name": "Summary"
+    },
+    {
+      "dataType": "STRING",
+      "name": "Text"
+    },
+    {
+      "dataType": "STRING",
+      "name": "combined"
+    },
+    {
+      "dataType": "INT",
+      "name": "n_tokens"
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "embedding",
+      "singleValueField": false
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "ts",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:SECONDS"
+    }
+  ],
+  "schemaName": "fineFoodReviews_part_1"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to