This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6372d78d98 make dedup table use strict replica group assignment too
(#15778)
6372d78d98 is described below
commit 6372d78d98fd391f62670a7d324b99f83c946bd6
Author: Xiaobing <[email protected]>
AuthorDate: Thu May 15 12:58:43 2025 -0700
make dedup table use strict replica group assignment too (#15778)
* make dedup table use strict replica group assignment too
* refine logs and add TODO for tiers support for
StrictRealtimeSegmentAssignment
---
.../segment/SegmentAssignmentFactory.java | 5 +-
.../segment/StrictRealtimeSegmentAssignment.java | 23 ++++----
.../StrictRealtimeSegmentAssignmentTest.java | 69 ++++++++++++----------
3 files changed, 55 insertions(+), 42 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index e32a7246b2..7e37026438 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -40,7 +41,9 @@ public class SegmentAssignmentFactory {
segmentAssignment = new OfflineSegmentAssignment();
} else {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
- if (upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) {
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
+ if ((upsertConfig != null && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE) || (dedupConfig != null
+ && dedupConfig.isDedupEnabled())) {
segmentAssignment = new StrictRealtimeSegmentAssignment();
} else {
segmentAssignment = new RealtimeSegmentAssignment();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index 2bc0ada5bd..6c76332a24 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -37,7 +37,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
/**
- * Segment assignment for LLC real-time table using upsert. The
assignSegment() of RealtimeSegmentAssignment is
+ * Segment assignment for LLC real-time table using upsert or dedup. The
assignSegment() of RealtimeSegmentAssignment is
* overridden to add new segment for a table partition in a way that's
consistent with the assignment in idealState to
* make sure that at any time the segments from the same table partition is
hosted by the same server.
* <ul>
@@ -47,12 +47,13 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
* InstancePartition and the one in idealState are different, the one in
idealState must be used so that segments
* from the same table partition are always hosted on the same server as
set in current idealState. If the
* idealState is not honored, segments from the same table partition may
be assigned to different servers,
- * breaking the key assumption for queries to be correct for the table
using upsert.
+ * breaking the key assumption for queries to be correct for the table
using upsert or dedup.
* </li>
* <li>
- * There is no need to handle COMPLETED segments for tables using upsert,
because their completed segments should
- * not be relocated to servers tagged to host COMPLETED segments.
Basically, upsert-enabled tables can only use
- * servers tagged for CONSUMING segments to host both consuming and
completed segments from a table partition.
+ * There is no need to handle COMPLETED segments for tables using upsert
or dedup, because their completed
+ * segments should not be relocated to servers tagged to host COMPLETED
segments. Basically, tables using upsert
+ * or dedup can only use servers tagged for CONSUMING segments to host
both consuming and completed segments from
+ * a table partition.
* </li>
* </ul>
*/
@@ -63,9 +64,9 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
// 1. This cache is used for table rebalance only, but not segment
assignment. During rebalance, rebalanceTable() can
// be invoked multiple times when the ideal state changes during the
rebalance process.
// 2. The cache won't be refreshed when an existing segment is replaced with
a segment from a different partition.
- // Replacing a segment with a segment from a different partition should
not be allowed for upsert table because it
- // will cause the segment being served by the wrong servers. If this
happens during the table rebalance, another
- // rebalance might be needed to fix the assignment.
+ // Replacing a segment with a segment from a different partition should
not be allowed for upsert/dedup table
+ // because it will cause the segment being served by the wrong servers.
If this happens during the table
+ // rebalance, another rebalance might be needed to fix the assignment.
private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
@Override
@@ -163,8 +164,10 @@ public class StrictRealtimeSegmentAssignment extends
RealtimeSegmentAssignment {
Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
_tableNameWithType);
Preconditions.checkArgument(config.isIncludeConsuming(),
- "Consuming segment must be included when rebalancing upsert table:
%s", _tableNameWithType);
- Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert table: %s",
+ "Consuming segment must be included when rebalancing upsert/dedup
table: %s", _tableNameWithType);
+ // TODO: consider to support tiers for segments out of metadata TTL for
upsert/dedup table, as those segments
+ // won't be included in the upsert/dedup metadata as kept on
CONSUMING tier.
+ Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert/dedup table: %s",
_tableNameWithType);
_logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 6775329d37..43cfb9998d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -39,6 +40,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -67,7 +69,6 @@ public class StrictRealtimeSegmentAssignmentTest {
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME);
private List<String> _segments;
- private SegmentAssignment _segmentAssignment;
private Map<InstancePartitionsType, InstancePartitions>
_instancePartitionsMap;
private InstancePartitions _newConsumingInstancePartitions;
@@ -78,16 +79,6 @@ public class StrictRealtimeSegmentAssignmentTest {
_segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId %
NUM_PARTITIONS, segmentId / NUM_PARTITIONS,
System.currentTimeMillis()).getSegmentName());
}
-
- Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
-
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
- .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
- _segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
-
_instancePartitionsMap = new TreeMap<>();
// CONSUMING instances:
// {
@@ -126,25 +117,40 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test
- public void testFactory() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @DataProvider(name = "tableTypes")
+ public Object[] getTableTypes() {
+ return new Object[]{"upsert", "dedup"};
+ }
+
+ private static SegmentAssignment createSegmentAssignment(String tableType) {
+ TableConfigBuilder builder = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+ .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
+ TableConfig tableConfig;
+ if ("upsert".equalsIgnoreCase(tableType)) {
+ tableConfig = builder.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).build();
+ } else {
+ tableConfig = builder.setDedupConfig(new DedupConfig()).build();
+ }
+ return SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig, null);
}
- @Test
- public void testAssignSegment() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @Test(dataProvider = "tableTypes")
+ public void testAssignSegment(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+ assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
- boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -171,7 +177,7 @@ public class StrictRealtimeSegmentAssignmentTest {
int segmentId = 3;
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned,
Arrays.asList("new_consumingInstance_0", "new_consumingInstance_3",
"new_consumingInstance_6"));
addToAssignment(currentAssignment, segmentId, instancesAssigned);
@@ -180,7 +186,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (segmentId = 4; segmentId < 7; segmentId++) {
segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -197,20 +203,20 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test
- public void testAssignSegmentWithOfflineSegment() {
- assertTrue(_segmentAssignment instanceof StrictRealtimeSegmentAssignment);
+ @Test(dataProvider = "tableTypes")
+ public void testAssignSegmentWithOfflineSegment(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+ assertTrue(segmentAssignment instanceof StrictRealtimeSegmentAssignment);
Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
// Add segments for partition 0/1/2, but add no segment for partition 3.
List<String> instancesAssigned;
- boolean consistent;
for (int segmentId = 0; segmentId < 3; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
// Segment 1 (partition 1) should be assigned to instance 1, 4, 7
@@ -238,7 +244,7 @@ public class StrictRealtimeSegmentAssignmentTest {
for (int segmentId = 3; segmentId < 7; segmentId++) {
String segmentName = _segments.get(segmentId);
instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
newConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Those segments are assigned according to the assignment from
idealState, instead of using new_xxx instances
@@ -255,9 +261,10 @@ public class StrictRealtimeSegmentAssignmentTest {
}
}
- @Test(expectedExceptions = IllegalStateException.class)
- public void testAssignSegmentToCompletedServers() {
- _segmentAssignment.assignSegment("seg01", new TreeMap<>(), new
TreeMap<>());
+ @Test(expectedExceptions = IllegalStateException.class, dataProvider =
"tableTypes")
+ public void testAssignSegmentToCompletedServers(String tableType) {
+ SegmentAssignment segmentAssignment = createSegmentAssignment(tableType);
+ segmentAssignment.assignSegment("seg01", new TreeMap<>(), new TreeMap<>());
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
@@ -276,7 +283,7 @@ public class StrictRealtimeSegmentAssignmentTest {
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
- private HelixManager createHelixManager() {
+ private static HelixManager createHelixManager() {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]