This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5972409d7e [Fix](mv) Fix ConcurrentModificationException in 
PartitionCompensator (#61145)
a5972409d7e is described below

commit a5972409d7ec42b19ca759c2f319b96766245b8b
Author: Sahil Devgon <[email protected]>
AuthorDate: Mon Mar 16 23:08:51 2026 +0530

    [Fix](mv) Fix ConcurrentModificationException in PartitionCompensator 
(#61145)
    
    ### What problem does this PR solve?
    This PR fixes the bug causing ConcurrentModificationException in
    PartitionCompensator.
    
    Co-authored-by: sahil-devgon <[email protected]>
---
 .../rules/exploration/mv/PartitionCompensator.java |  19 ++--
 .../exploration/mv/PartitionCompensatorTest.java   | 123 +++++++++++++++++++++
 2 files changed, 133 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 3fe966864d0..4c26ab47cec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -139,19 +139,20 @@ public class PartitionCompensator {
                         .computeIfAbsent(baseTableNeedUnionTable.key(), k -> 
new HashSet<>())
                         .addAll(baseTableNeedUnionTable.value());
             }
-            // merge all partition to delete or union
-            Set<String> needRemovePartitionSet = new HashSet<>();
-            
mvPartitionNeedRemoveNameMap.values().forEach(needRemovePartitionSet::addAll);
-            mvPartitionNeedRemoveNameMap.replaceAll((k, v) -> 
needRemovePartitionSet);
-
-            // consider multi base table partition name not same, how to 
handle it?
-            Set<String> needUnionPartitionSet = new HashSet<>();
-            
baseTablePartitionNeedUnionNameMap.values().forEach(needUnionPartitionSet::addAll);
-            baseTablePartitionNeedUnionNameMap.replaceAll((k, v) -> 
needUnionPartitionSet);
         }
         if (allCompensateIsNull) {
             return null;
         }
+        // merge all partition to delete or union
+        Set<String> needRemovePartitionSet = new HashSet<>();
+        
mvPartitionNeedRemoveNameMap.values().forEach(needRemovePartitionSet::addAll);
+        mvPartitionNeedRemoveNameMap.replaceAll((k, v) -> 
needRemovePartitionSet);
+
+        // consider multi base table partition name not same, how to handle it?
+        Set<String> needUnionPartitionSet = new HashSet<>();
+        
baseTablePartitionNeedUnionNameMap.values().forEach(needUnionPartitionSet::addAll);
+        baseTablePartitionNeedUnionNameMap.replaceAll((k, v) -> 
needUnionPartitionSet);
+
         return Pair.of(mvPartitionNeedRemoveNameMap, 
baseTablePartitionNeedUnionNameMap);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 17d75f93fcf..3c1b2bb9519 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
 
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.TableIf;
@@ -30,7 +31,9 @@ import org.apache.doris.mtmv.BaseColInfo;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
@@ -41,10 +44,13 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -373,6 +379,123 @@ public class PartitionCompensatorTest extends 
TestWithFeService {
         return ctx;
     }
 
+    // Regression test for ConcurrentModificationException when merging 
partition maps
+    // across multiple related tables. The bug was caused by calling 
replaceAll() inside
+    // the for-loop after forEach() had already replaced map values with a 
shared Set
+    // reference, causing a self-modification on the second iteration.
+    // This test calls calcInvalidPartitions() directly with two related 
tables so that
+    // the exact code path containing the bug is exercised end-to-end.
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
testCalcInvalidPartitionsNoConcurrentModificationWithTwoRelatedTables()
+            throws Exception {
+        // Shared catalog/db for both related base tables
+        CatalogIf<?> baseCatalog = Mockito.mock(CatalogIf.class);
+        Mockito.when(baseCatalog.getName()).thenReturn("cat");
+        Mockito.when(baseCatalog.getId()).thenReturn(1L);
+        DatabaseIf<?> baseDb = Mockito.mock(DatabaseIf.class);
+        Mockito.when(baseDb.getFullName()).thenReturn("db");
+        Mockito.when(baseDb.getId()).thenReturn(2L);
+        Mockito.when(baseDb.getCatalog()).thenReturn(baseCatalog);
+
+        MTMVRelatedTableIf relatedTable1 = mockRelatedTableIf(
+                "t1", 10L, ImmutableList.of("cat", "db", "t1"), baseDb);
+        MTMVRelatedTableIf relatedTable2 = mockRelatedTableIf(
+                "t2", 20L, ImmutableList.of("cat", "db", "t2"), baseDb);
+
+        // Two MV valid partitions: mv_p1 maps to t1_p1, mv_p2 maps to t2_p2
+        Partition mvP1 = Mockito.mock(Partition.class);
+        Mockito.when(mvP1.getId()).thenReturn(101L);
+        Mockito.when(mvP1.getName()).thenReturn("mv_p1");
+        Partition mvP2 = Mockito.mock(Partition.class);
+        Mockito.when(mvP2.getId()).thenReturn(102L);
+        Mockito.when(mvP2.getName()).thenReturn("mv_p2");
+
+        Map<String, Set<String>> mappingForTable1 = new HashMap<>();
+        mappingForTable1.put("mv_p1", ImmutableSet.of("t1_p1"));
+        Map<String, Set<String>> mappingForTable2 = new HashMap<>();
+        mappingForTable2.put("mv_p2", ImmutableSet.of("t2_p2"));
+        Map<MTMVRelatedTableIf, Map<String, Set<String>>> partitionMappings = 
new HashMap<>();
+        partitionMappings.put(relatedTable1, mappingForTable1);
+        partitionMappings.put(relatedTable2, mappingForTable2);
+
+        BaseColInfo colInfo1 = new BaseColInfo("date_col", new 
BaseTableInfo(relatedTable1));
+        BaseColInfo colInfo2 = new BaseColInfo("date_col", new 
BaseTableInfo(relatedTable2));
+
+        // Separate catalog/db for the MV itself
+        CatalogIf<?> mvCatalog = Mockito.mock(CatalogIf.class);
+        Mockito.when(mvCatalog.getName()).thenReturn("internal");
+        Mockito.when(mvCatalog.getId()).thenReturn(1L);
+        DatabaseIf<?> mvDb = Mockito.mock(DatabaseIf.class);
+        Mockito.when(mvDb.getFullName()).thenReturn("mv_db");
+        Mockito.when(mvDb.getId()).thenReturn(3L);
+        Mockito.when(mvDb.getCatalog()).thenReturn(mvCatalog);
+
+        MTMV mtmv = Mockito.mock(MTMV.class);
+        Mockito.when(mtmv.getName()).thenReturn("mv1");
+        Mockito.when(mtmv.getId()).thenReturn(100L);
+        Mockito.when(mtmv.getDatabase()).thenReturn(mvDb);
+        PartitionInfo mvPartitionInfo = Mockito.mock(PartitionInfo.class);
+        Mockito.when(mtmv.getPartitionInfo()).thenReturn(mvPartitionInfo);
+        
Mockito.when(mvPartitionInfo.getType()).thenReturn(PartitionType.RANGE);
+        MTMVPartitionInfo mvPctInfo = Mockito.mock(MTMVPartitionInfo.class);
+        Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mvPctInfo);
+        
Mockito.when(mvPctInfo.getPctTables()).thenReturn(ImmutableSet.of(relatedTable1,
 relatedTable2));
+        
Mockito.when(mvPctInfo.getPctInfos()).thenReturn(ImmutableList.of(colInfo1, 
colInfo2));
+        // All MV partitions contain data
+        
Mockito.when(mtmv.selectNonEmptyPartitionIds(ArgumentMatchers.any())).thenReturn(ImmutableList.of(1L));
+
+        AsyncMaterializationContext matCtx = 
Mockito.mock(AsyncMaterializationContext.class);
+        Mockito.when(matCtx.getMtmv()).thenReturn(mtmv);
+        
Mockito.when(matCtx.calculatePartitionMappings()).thenReturn(partitionMappings);
+
+        // StatementContext: the MV's two valid partitions are available for 
rewrite
+        Map<BaseTableInfo, Collection<Partition>> canRewriteMap = new 
HashMap<>();
+        canRewriteMap.put(new BaseTableInfo(mtmv), ImmutableList.of(mvP1, 
mvP2));
+        StatementContext stmtCtx = Mockito.mock(StatementContext.class);
+        
Mockito.when(stmtCtx.getMvCanRewritePartitionsMap()).thenReturn(canRewriteMap);
+
+        CascadesContext cascadesCtx = Mockito.mock(CascadesContext.class);
+        Mockito.when(cascadesCtx.getStatementContext()).thenReturn(stmtCtx);
+
+        // Rewritten plan has no MV scans, so mvNeedRemovePartitionNameSet 
stays empty
+        Plan rewrittenPlan = Mockito.mock(Plan.class);
+        
Mockito.when(rewrittenPlan.collectToList(ArgumentMatchers.any())).thenReturn(Collections.emptyList());
+
+        // Each table contributes one covered and one uncovered partition:
+        //   t1 uses {t1_p1 (covered by mv_p1), t1_p2 (not covered)}
+        //   t2 uses {t2_p1 (not covered), t2_p2 (covered by mv_p2)}
+        Map<List<String>, Set<String>> queryUsedPartitions = new HashMap<>();
+        queryUsedPartitions.put(ImmutableList.of("cat", "db", "t1"),
+                ImmutableSet.of("t1_p1", "t1_p2"));
+        queryUsedPartitions.put(ImmutableList.of("cat", "db", "t2"),
+                ImmutableSet.of("t2_p1", "t2_p2"));
+
+        // Must not throw ConcurrentModificationException when two related 
tables each
+        // contribute entries that require the post-loop merge in 
calcInvalidPartitions()
+        Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo, Set<String>>> 
result =
+                Assertions.assertDoesNotThrow(() ->
+                        PartitionCompensator.calcInvalidPartitions(
+                                queryUsedPartitions, rewrittenPlan, matCtx, 
cascadesCtx));
+
+        // The uncovered partitions from both tables should be merged into one 
unified set
+        Assertions.assertNotNull(result);
+        Set<String> expectedUnion = ImmutableSet.of("t1_p2", "t2_p1");
+        result.value().values()
+                .forEach(v -> Assertions.assertEquals(expectedUnion, v));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static MTMVRelatedTableIf mockRelatedTableIf(
+            String tableName, long tableId, List<String> qualifiers, 
DatabaseIf<?> db) {
+        MTMVRelatedTableIf table = Mockito.mock(MTMVRelatedTableIf.class);
+        Mockito.when(table.getName()).thenReturn(tableName);
+        Mockito.when(table.getId()).thenReturn(tableId);
+        Mockito.when(table.getDatabase()).thenReturn(db);
+        Mockito.when(table.getFullQualifiers()).thenReturn(qualifiers);
+        return table;
+    }
+
     private static BaseTableInfo newBaseTableInfo() {
         CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
         Mockito.when(catalog.getId()).thenReturn(1L);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to