zddr commented on code in PR #32156:
URL: https://github.com/apache/doris/pull/32156#discussion_r1529655488


##########
fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java:
##########
@@ -69,12 +82,85 @@ public long registerTask(long dbId, long tableId, 
List<String> tempPartitionName
         return taskId;
     }
 
+    /**
+     * register insert overwrite task group for auto detect partition.
+     * it may have many tasks by FrontendService rpc deal.
+     * all of them will be involved in one txn.(success or fallback)
+     *
+     * @return group id, like a transaction id.
+     */
+    public long preRegisterTask() {
+        long groupId = Env.getCurrentEnv().getNextId();
+        taskGroups.put(groupId, new ArrayList<Long>());
+        taskLocks.put(groupId, new ReentrantLock());
+        partitionPairs.put(groupId, Maps.newConcurrentMap());
+        return groupId;
+    }
+
+    /**
+     * for iot auto detect. register task first. then put in group.
+     */
+    public void registerTaskInGroup(long groupId, long taskId) {
+        LOG.info("register task " + taskId + " in group " + groupId);
+        taskGroups.get(groupId).add(taskId);
+    }
+
+    public List<Long> tryReplacePartitionIds(long groupId, List<Long> 
oldPartitionIds) {
+        Map<Long, Long> relations = partitionPairs.get(groupId);
+        List<Long> newIds = new ArrayList<Long>();
+        for (Long id : oldPartitionIds) {
+            if (relations.containsKey(id)) {
+                // if we replaced it. then return new one.
+                newIds.add(relations.get(id));
+            } else {
+                // otherwise itself. we will deal it soon.
+                newIds.add(id);
+            }
+        }
+        return newIds;
+    }
+
+    public void recordPartitionPairs(long groupId, List<Long> oldIds, 
List<Long> newIds) {
+        Map<Long, Long> relations = partitionPairs.get(groupId);
+        Preconditions.checkArgument(oldIds.size() == newIds.size());
+        for (int i = 0; i < oldIds.size(); i++) {
+            relations.put(oldIds.get(i), newIds.get(i));
+        }
+    }
+
+    public ReentrantLock getLock(long groupId) {
+        return taskLocks.get(groupId);
+    }
+
+    public void taskGroupFail(long groupId) {
+        LOG.info("insert overwrite auto detect partition task group [" + 
groupId + "] failed");
+        for (Long taskId : taskGroups.get(groupId)) {
+            taskFail(taskId);
+        }
+        cleanTaskGroup(groupId);

Review Comment:
   If fe is killed before `cleanTaskGroup`, will dirty data be generated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to