This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de3a2fba9f [improvement](fe) Add configurable return mode for insert 
publish timeout  in ETL scenarios (#63919)
8de3a2fba9f is described below

commit 8de3a2fba9fa75c5d403912958be5aed6cc7e949
Author: Wen Zhenghu <[email protected]>
AuthorDate: Fri Jun 5 15:06:35 2026 +0800

    [improvement](fe) Add configurable return mode for insert publish timeout  
in ETL scenarios (#63919)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    Normal internal-table inserts currently treat publish timeout as a
    committed insert and return success with COMMITTED status. This behavior
    is acceptable when clients only care that the transaction has been
    committed and can tolerate delayed visibility, but it is unsafe for
    pipelines whose downstream steps depend on the inserted data already
    being visible.
    
    A typical case is ETL workflows that first use CREATE TABLE AS SELECT to
    build a temporary table and then immediately read that table to populate
    a downstream result table. If the upstream transaction has been
    committed but is not yet VISIBLE, the downstream step may temporarily
    read no rows and silently write empty data into the final table, so the
    whole pipeline appears successful even though the result is incorrect.
    
    Doris already returns an error in explicit transaction mode when a
    COMMIT statement times out before the transaction becomes visible. This
    change adds a compatible mode for the regular non-transactional
    internal-table insert path by introducing a session variable,
    insert_visible_timeout_return_mode, so users can choose whether publish
    timeout should keep returning COMMITTED or return ERR.
    
    The implementation also keeps committed-side bookkeeping unchanged in
    error mode so finished load jobs, insert result metadata, and related
    accounting still reflect the real transaction state.
    
    ### Release note
    
    Add a session variable to control whether normal internal-table inserts
    return COMMITTED or ERR when publish visibility times out.
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [x] Regression test
        - [x] Unit Test
        - [x] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../plans/commands/insert/OlapInsertExecutor.java  |  16 ++
 .../java/org/apache/doris/qe/SessionVariable.java  |  71 ++++++
 .../commands/insert/OlapInsertExecutorTest.java    | 274 +++++++++++++++++++++
 .../org/apache/doris/qe/SessionVariablesTest.java  |  88 ++++++-
 .../test_insert_visible_timeout_return_mode.out    |   5 +
 .../test_insert_visible_timeout_return_mode.groovy |  83 +++++++
 6 files changed, 535 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index a1e22fd2902..c4ae68acb98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -46,6 +46,7 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
@@ -78,7 +79,12 @@ import java.util.stream.Collectors;
  */
 public class OlapInsertExecutor extends AbstractInsertExecutor {
     private static final Logger LOG = 
LogManager.getLogger(OlapInsertExecutor.class);
+    // Keep the timeout message aligned with the client-facing error returned 
by the legacy insert path.
+    private static final String INSERT_VISIBLE_TIMEOUT_ERROR_MSG = 
"transaction commit successfully, "
+            + "BUT data did not become visible within 
insert_visible_timeout_ms and will be visible later.";
     protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
+    // Track publish timeout separately from real failures so committed 
bookkeeping still runs.
+    protected boolean publishTimedOutAfterCommit = false;
 
     protected OlapTable olapTable;
 
@@ -236,7 +242,9 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 ctx.getSessionVariable().getInsertVisibleTimeoutMs(), 
txnCommitAttachment)) {
             txnStatus = TransactionStatus.VISIBLE;
         } else {
+            // Keep the committed status so load accounting and insert result 
bookkeeping stay aligned.
             txnStatus = TransactionStatus.COMMITTED;
+            publishTimedOutAfterCommit = true;
         }
         if (Config.isCloudMode()) {
             String clusterName = ctx.getCloudCluster();
@@ -370,6 +378,14 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                 txnStatus, loadedRows, filteredRows);
         // update it, so that user can get loaded rows in fe.audit.log
         ctx.updateReturnRows((int) loadedRows);
+        if (publishTimedOutAfterCommit && 
ctx.getSessionVariable().isInsertVisibleTimeoutReturnError()) {
+            // Log the committed timeout branch explicitly so operators can 
distinguish it from real failures.
+            LOG.warn("insert [{}] with txn id {} committed but return error 
because {}={}",
+                    labelName, txnId, 
SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE,
+                    SessionVariable.InsertVisibleTimeoutReturnMode.ERROR);
+            // Convert the final client response to ERR after all 
committed-side bookkeeping has finished.
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
INSERT_VISIBLE_TIMEOUT_ERROR_MSG);
+        }
     }
 
     public long getTimeout() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2902615afef..3f6da2fed73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -269,6 +269,30 @@ public class SessionVariable implements Serializable, 
Writable {
 
     // max ms to wait transaction publish finish when exec insert stmt.
     public static final String INSERT_VISIBLE_TIMEOUT_MS = 
"insert_visible_timeout_ms";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE = 
"insert_visible_timeout_return_mode";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED = 
"committed";
+    public static final String INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR = 
"error";
+
+    // Keep the mode enum for business logic while storing the session value 
as a string.
+    public enum InsertVisibleTimeoutReturnMode {
+        COMMITTED(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED),
+        ERROR(INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+        private final String option;
+
+        InsertVisibleTimeoutReturnMode(String option) {
+            this.option = option;
+        }
+
+        public String getOption() {
+            return option;
+        }
+
+        @Override
+        public String toString() {
+            return option;
+        }
+    }
 
     public static final String DELETE_WITHOUT_PARTITION = 
"delete_without_partition";
 
@@ -1100,6 +1124,15 @@ public class SessionVariable implements Serializable, 
Writable {
     @VarAttrDef.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
     public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
 
+    // Control whether publish timeout keeps the committed response or returns 
an explicit error.
+    @VarAttrDef.VarAttr(name = INSERT_VISIBLE_TIMEOUT_RETURN_MODE, needForward 
= true,
+            checker = "checkInsertVisibleTimeoutReturnMode", setter = 
"setInsertVisibleTimeoutReturnMode",
+            description = {"控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+                    "Controls the status returned to the client when a normal 
internal-table INSERT times out "
+                            + "while waiting for publish visibility."},
+            options = {INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED, 
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR})
+    public String insertVisibleTimeoutReturnMode = 
INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED;
+
     // max memory used on every backend. Default value to 100G.
     @VarAttrDef.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
     public long maxExecMemByte = 100147483648L;
@@ -4938,6 +4971,23 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public String getInsertVisibleTimeoutReturnMode() {
+        return getInsertVisibleTimeoutReturnModeEnum().getOption();
+    }
+
+    public InsertVisibleTimeoutReturnMode 
getInsertVisibleTimeoutReturnModeEnum() {
+        return 
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode);
+    }
+
+    public boolean isInsertVisibleTimeoutReturnError() {
+        return getInsertVisibleTimeoutReturnModeEnum() == 
InsertVisibleTimeoutReturnMode.ERROR;
+    }
+
+    public void setInsertVisibleTimeoutReturnMode(String 
insertVisibleTimeoutReturnMode) {
+        this.insertVisibleTimeoutReturnMode = 
parseInsertVisibleTimeoutReturnMode(insertVisibleTimeoutReturnMode)
+                .getOption();
+    }
+
     public boolean getIsSingleSetVar() {
         return isSingleSetVar;
     }
@@ -5326,6 +5376,27 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public void checkInsertVisibleTimeoutReturnMode(String mode) {
+        // Reuse the parser so validation stays consistent with assignment and 
enum access.
+        parseInsertVisibleTimeoutReturnMode(mode);
+    }
+
+    // Parse the stored string case-insensitively and expose the enum only to 
business logic.
+    private InsertVisibleTimeoutReturnMode 
parseInsertVisibleTimeoutReturnMode(String mode) {
+        if (StringUtils.isEmpty(mode)) {
+            LOG.warn("insertVisibleTimeoutReturnMode value is empty");
+            throw new 
UnsupportedOperationException("insertVisibleTimeoutReturnMode value is empty");
+        }
+        for (InsertVisibleTimeoutReturnMode value : 
InsertVisibleTimeoutReturnMode.values()) {
+            if (value.getOption().equalsIgnoreCase(mode)) {
+                return value;
+            }
+        }
+        LOG.warn("insertVisibleTimeoutReturnMode value is invalid, the invalid 
value is {}", mode);
+        throw new UnsupportedOperationException(
+                "insertVisibleTimeoutReturnMode value is invalid, the invalid 
value is " + mode);
+    }
+
     public void checkMaxExecutionTimeMSValid(String newValue) {
         int value = Integer.valueOf(newValue);
         if (value < 1000) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
new file mode 100644
index 00000000000..ea5318eedcb
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutorTest.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.HiveTransactionMgr;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.job.manager.StreamingTaskManager;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.InsertResult;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgrIface;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+/**
+ * Tests for publish-timeout behaviors in {@link OlapInsertExecutor}.
+ */
+class OlapInsertExecutorTest {
+
+    @AfterEach
+    void tearDown() {
+        ConnectContext.remove();
+    }
+
+    @Test
+    void 
testExecuteSingleInsertPublishTimeoutReturnErrorKeepsCommittedAccounting() 
throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        ctx.getSessionVariable().setInsertVisibleTimeoutReturnMode(
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR);
+
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+        StmtExecutor stmtExecutor = createStmtExecutor();
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            Mockito.when(txnMgr.commitAndPublishTransaction(
+                    Mockito.any(), Mockito.anyList(), Mockito.anyLong(), 
Mockito.anyList(), Mockito.anyLong(),
+                    Mockito.isNull())).thenReturn(false);
+
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10001L;
+            executor.executeSingleInsert(stmtExecutor);
+
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
executor.txnStatus);
+            Assertions.assertEquals(MysqlStateType.ERR, 
ctx.getState().getStateType());
+            Assertions.assertTrue(ctx.getState().getErrorMessage().contains(
+                    "transaction commit successfully, BUT data did not become 
visible within "
+                            + "insert_visible_timeout_ms and will be visible 
later."));
+
+            InsertResult insertResult = ctx.getInsertResult();
+            Assertions.assertNotNull(insertResult);
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
insertResult.txnStatus);
+            Assertions.assertEquals(12L, insertResult.loadedRows);
+            Assertions.assertEquals(1L, insertResult.filteredRows);
+            Assertions.assertEquals(12L, ctx.getReturnRows());
+
+            
Mockito.verify(loadManager).recordFinishedLoadJob(Mockito.eq("label_test"), 
Mockito.eq(10001L),
+                    Mockito.eq("test_db"), Mockito.eq(2L), 
Mockito.eq(EtlJobType.INSERT), Mockito.anyLong(),
+                    Mockito.eq(""), Mockito.isNull(), Mockito.isNull(), 
Mockito.eq(UserIdentity.ROOT),
+                    Mockito.anyLong());
+            Mockito.verify(txnMgr, 
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+                    Mockito.anyString());
+        }
+    }
+
+    @Test
+    void testPublishTimeoutCommittedModeReturnsOk() throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+        StmtExecutor stmtExecutor = createStmtExecutor();
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            Mockito.when(txnMgr.commitAndPublishTransaction(
+                    Mockito.any(), Mockito.anyList(), Mockito.anyLong(), 
Mockito.anyList(), Mockito.anyLong(),
+                    Mockito.isNull())).thenReturn(false);
+
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10002L;
+            executor.executeSingleInsert(stmtExecutor);
+
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
executor.txnStatus);
+            Assertions.assertEquals(MysqlStateType.OK, 
ctx.getState().getStateType());
+            
Assertions.assertTrue(ctx.getState().getInfoMessage().contains("'status':'COMMITTED'"));
+
+            InsertResult insertResult = ctx.getInsertResult();
+            Assertions.assertNotNull(insertResult);
+            Assertions.assertEquals(TransactionStatus.COMMITTED, 
insertResult.txnStatus);
+            Assertions.assertEquals(12L, insertResult.loadedRows);
+            Assertions.assertEquals(1L, insertResult.filteredRows);
+            Assertions.assertEquals(12L, ctx.getReturnRows());
+
+            Mockito.verify(txnMgr, 
Mockito.never()).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
+                    Mockito.anyString());
+        }
+    }
+
+    @Test
+    void testOnFailAbortsUncommittedTransaction() throws Exception {
+        ConnectContext ctx = createExecutorContext();
+        Coordinator coordinator = createCoordinator();
+        GlobalTransactionMgrIface txnMgr = 
Mockito.mock(GlobalTransactionMgrIface.class);
+        TransactionState txnState = Mockito.mock(TransactionState.class);
+        LoadManager loadManager = Mockito.mock(LoadManager.class);
+        Env currentEnv = createCurrentEnv(loadManager);
+
+        try (MockedStatic<EnvFactory> envFactoryMock = 
Mockito.mockStatic(EnvFactory.class);
+                MockedStatic<Env> envMock = Mockito.mockStatic(Env.class)) {
+            prepareFactoryMocks(envFactoryMock, envMock, coordinator, txnMgr, 
txnState, currentEnv);
+            ctx.setEnv(currentEnv);
+
+            // Simulate a pre-commit failure so the executor must abort the 
transaction.
+            OlapInsertExecutor executor = createExecutor(ctx);
+            executor.txnId = 10003L;
+            executor.txnStatus = TransactionStatus.ABORTED;
+
+            executor.onFail(new RuntimeException("pre-commit failure"));
+
+            Assertions.assertEquals(MysqlStateType.ERR, 
ctx.getState().getStateType());
+            
Assertions.assertTrue(ctx.getState().getErrorMessage().contains("pre-commit 
failure"));
+            Assertions.assertNull(ctx.getInsertResult());
+            Mockito.verify(txnMgr).abortTransaction(Mockito.eq(1L), 
Mockito.eq(10003L),
+                    Mockito.eq("pre-commit failure"));
+        }
+    }
+
+    // Build a fresh context per case so insertResult and QueryState do not 
leak between tests.
+    private ConnectContext createExecutorContext() {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setThreadLocalInfo();
+        ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+        ctx.setQueryId(new TUniqueId(1, 2));
+        // Disable strict insert mode because this test intentionally keeps 
one filtered row.
+        ctx.getSessionVariable().setEnableInsertStrict(false);
+        ctx.getState().reset();
+        ctx.resetReturnRows();
+        return ctx;
+    }
+
+    // Prepare the mocked coordinator so the executor can run its completion 
logic without real execution.
+    private Coordinator createCoordinator() throws Exception {
+        Coordinator coordinator = Mockito.mock(Coordinator.class);
+        // Return non-null query options because master registers the 
coordinator in QeProcessor.
+        TQueryOptions queryOptions = new TQueryOptions();
+        Mockito.when(coordinator.join(Mockito.anyInt())).thenReturn(true);
+        Mockito.when(coordinator.isDone()).thenReturn(true);
+        Mockito.when(coordinator.getExecStatus()).thenReturn(new 
Status(TStatusCode.OK, ""));
+        Mockito.when(coordinator.getQueryOptions()).thenReturn(queryOptions);
+        
Mockito.when(coordinator.getCommitInfos()).thenReturn(Lists.newArrayList());
+        Mockito.when(coordinator.getTrackingUrl()).thenReturn(null);
+        Mockito.when(coordinator.getFirstErrorMsg()).thenReturn(null);
+        
Mockito.when(coordinator.getExecutionProfile()).thenReturn(Mockito.mock(ExecutionProfile.class));
+        Mockito.when(coordinator.getLoadCounters()).thenReturn(ImmutableMap.of(
+                "dpp.norm.ALL", "12",
+                "dpp.abnorm.ALL", "1"));
+        return coordinator;
+    }
+
+    // Use a mocked executor so executeSingleInsert can run the real control 
flow without a full query setup.
+    private StmtExecutor createStmtExecutor() {
+        StmtExecutor stmtExecutor = Mockito.mock(StmtExecutor.class);
+        
Mockito.when(stmtExecutor.getProfile()).thenReturn(Mockito.mock(Profile.class));
+        
Mockito.when(stmtExecutor.getSummaryProfile()).thenReturn(Mockito.mock(SummaryProfile.class));
+        Mockito.when(stmtExecutor.getOriginStmtInString()).thenReturn("insert 
into test_tbl select 1");
+        Mockito.when(stmtExecutor.getParsedStmt()).thenReturn(null);
+        Mockito.when(stmtExecutor.isProfileSafeStmt()).thenReturn(false);
+        return stmtExecutor;
+    }
+
+    // Provide the job-manager chain needed by master-side setTxnCallbackId().
+    private Env createCurrentEnv(LoadManager loadManager) {
+        Env currentEnv = Mockito.mock(Env.class);
+        // Mock the internal catalog because ConnectContext.setEnv() resolves 
the default catalog on master.
+        InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+        JobManager<?, ?> jobManager = Mockito.mock(JobManager.class);
+        StreamingTaskManager streamingTaskManager = 
Mockito.mock(StreamingTaskManager.class);
+        
Mockito.when(currentEnv.getInternalCatalog()).thenReturn(internalCatalog);
+        Mockito.when(internalCatalog.getName()).thenReturn("internal");
+        Mockito.when(currentEnv.getLoadManager()).thenReturn(loadManager);
+        Mockito.when(currentEnv.getJobManager()).thenReturn(jobManager);
+        
Mockito.when(jobManager.getStreamingTaskManager()).thenReturn(streamingTaskManager);
+        
Mockito.when(streamingTaskManager.getStreamingInsertTaskById(Mockito.anyLong())).thenReturn(null);
+        return currentEnv;
+    }
+
+    // Create an executor with mocked table metadata because this test only 
validates timeout result handling.
+    private OlapInsertExecutor createExecutor(ConnectContext ctx) {
+        Database database = Mockito.mock(Database.class);
+        Mockito.when(database.getFullName()).thenReturn("test_db");
+        Mockito.when(database.getId()).thenReturn(1L);
+
+        // Mock OlapTable because the master-side executor now casts the 
target table to OlapTable.
+        OlapTable table = Mockito.mock(OlapTable.class);
+        Mockito.when(table.getDatabase()).thenReturn(database);
+        Mockito.when(table.getName()).thenReturn("test_tbl");
+        Mockito.when(table.getId()).thenReturn(2L);
+
+        return new OlapInsertExecutor(ctx, table, "label_test", 
Mockito.mock(NereidsPlanner.class),
+                Optional.empty(), false, 0L);
+    }
+
+    // Redirect coordinator creation and transaction access to mocks so the 
test stays deterministic.
+    private void prepareFactoryMocks(MockedStatic<EnvFactory> envFactoryMock, 
MockedStatic<Env> envMock,
+            Coordinator coordinator, GlobalTransactionMgrIface txnMgr, 
TransactionState txnState, Env currentEnv) {
+        EnvFactory envFactory = Mockito.mock(EnvFactory.class);
+        HiveTransactionMgr hiveTransactionMgr = 
Mockito.mock(HiveTransactionMgr.class);
+        envFactoryMock.when(EnvFactory::getInstance).thenReturn(envFactory);
+        Mockito.when(envFactory.createCoordinator(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.anyLong()))
+                .thenReturn(coordinator);
+
+        envMock.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+        
envMock.when(Env::getCurrentHiveTransactionMgr).thenReturn(hiveTransactionMgr);
+        envMock.when(Env::getCurrentEnv).thenReturn(currentEnv);
+        Mockito.when(txnMgr.getTransactionState(Mockito.anyLong(), 
Mockito.anyLong())).thenReturn(txnState);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 06bf0065baa..b33a3ef0ad5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -20,8 +20,10 @@ package org.apache.doris.qe;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.utframe.TestWithFeService;
@@ -32,6 +34,7 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.Map;
 
 public class SessionVariablesTest extends TestWithFeService {
@@ -44,8 +47,7 @@ public class SessionVariablesTest extends TestWithFeService {
         FeConstants.runningUnitTest = true;
         createDatabase("test_d");
         useDatabase("test_d");
-        createTable("create table test_t1 \n" + "(k1 int, k2 int) distributed 
by hash(k1) buckets 1\n"
-                + "properties(\"replication_num\" = \"1\");");
+        // Skip creating an OLAP table because these cases only validate 
session-variable behavior and parsing.
 
         sessionVariable = new SessionVariable();
         Field[] fields = SessionVariable.class.getDeclaredFields();
@@ -66,8 +68,13 @@ public class SessionVariablesTest extends TestWithFeService {
         Assertions.assertEquals(numOfForwardVars, vars.size());
 
         vars.put(SessionVariable.ENABLE_PROFILE, "true");
+        vars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, "ERROR");
         sessionVariable.setForwardedSessionVariables(vars);
         Assertions.assertTrue(sessionVariable.enableProfile);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVariable.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVariable.getInsertVisibleTimeoutReturnModeEnum());
     }
 
     @Test
@@ -82,6 +89,83 @@ public class SessionVariablesTest extends TestWithFeService {
                 
sessionVariableClone.getSessionOriginValue().get(txIsolationSessionVariableField));
     }
 
+    @Test
+    public void testInsertVisibleTimeoutReturnMode() throws Exception {
+        connectContext.setThreadLocalInfo();
+        SessionVariable sessionVar = connectContext.getSessionVariable();
+
+        VariableMgr.setVar(sessionVar, new SetVar(SetType.SESSION,
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, new 
StringLiteral("ERROR")));
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                
sessionVar.getForwardVariables().get(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE));
+
+        SessionVariable restored = new SessionVariable();
+        
restored.readFromJson("{\"insert_visible_timeout_return_mode\":\"ERROR\"}");
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        // Verify map restore keeps accepting canonical string tokens without 
extra normalization hooks.
+        Map<String, String> restoredMap = new HashMap<>();
+        restoredMap.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
"ERROR");
+        restored.readFromMap(restoredMap);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        Map<String, String> forwardVars = sessionVar.getForwardVariables();
+        forwardVars.put(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
"ERROR");
+        restored.setForwardedSessionVariables(forwardVars);
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                restored.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                restored.getInsertVisibleTimeoutReturnModeEnum());
+
+        Field field = 
SessionVariable.class.getDeclaredField("insertVisibleTimeoutReturnMode");
+        VarAttrDef.VarAttr varAttr = 
field.getAnnotation(VarAttrDef.VarAttr.class);
+        Assertions.assertArrayEquals(new String[] {
+                "控制普通内表 INSERT 在 publish timeout 时返回给客户端的状态。",
+                "Controls the status returned to the client when a normal 
internal-table INSERT times out "
+                        + "while waiting for publish visibility."
+        }, varAttr.description());
+        Assertions.assertArrayEquals(new String[] {
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+                SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR
+        }, varAttr.options());
+
+        ExceptionChecker.expectThrowsWithMsg(DdlException.class,
+                "insertVisibleTimeoutReturnMode value is invalid",
+                () -> VariableMgr.setVar(sessionVar, new 
SetVar(SetType.SESSION,
+                        SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE, 
new StringLiteral("unexpected"))));
+    }
+
+    @Test
+    public void testInsertVisibleTimeoutReturnModeDefaultsAndCheckerBranches() 
{
+        // Cover the default branch and the helper methods used by 
setter/checker paths.
+        SessionVariable sessionVar = new SessionVariable();
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_COMMITTED,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        Assertions.assertFalse(sessionVar.isInsertVisibleTimeoutReturnError());
+
+        // Verify setter normalization is case-insensitive and stores the 
canonical lowercase value.
+        sessionVar.setInsertVisibleTimeoutReturnMode("ErRoR");
+        
Assertions.assertEquals(SessionVariable.INSERT_VISIBLE_TIMEOUT_RETURN_MODE_ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnMode());
+        
Assertions.assertEquals(SessionVariable.InsertVisibleTimeoutReturnMode.ERROR,
+                sessionVar.getInsertVisibleTimeoutReturnModeEnum());
+        Assertions.assertTrue(sessionVar.isInsertVisibleTimeoutReturnError());
+
+        
ExceptionChecker.expectThrowsWithMsg(UnsupportedOperationException.class,
+                "insertVisibleTimeoutReturnMode value is empty",
+                () -> sessionVar.checkInsertVisibleTimeoutReturnMode(""));
+    }
+
     @Test
     public void testSetVarInHint() {
         String sql = "insert into test_t1 select /*+ 
set_var(enable_nereids_dml_with_pipeline=false)*/ * from test_t1 where 
enable_nereids_dml_with_pipeline=true";
diff --git 
a/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out 
b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
new file mode 100644
index 00000000000..fa5640a23c0
--- /dev/null
+++ b/regression-test/data/insert_p0/test_insert_visible_timeout_return_mode.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !final_select --
+1      10
+2      20
+
diff --git 
a/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
 
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
new file mode 100644
index 00000000000..978bd137d8e
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/test_insert_visible_timeout_return_mode.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite("test_insert_visible_timeout_return_mode", "nonConcurrent") {
+    if (isCloudMode()) {
+        return
+    }
+
+    def tableName = "test_insert_visible_timeout_return_mode_tbl"
+    def debugPoint = "PublishVersionDaemon.stop_publish"
+    // Use the configured FE HTTP endpoint so the case also works when SHOW 
FRONTENDS exposes loopback addresses.
+    def feHttpAddress = context.config.feHttpAddress
+    def feHost = feHttpAddress.split(":")[0]
+    def feHttpPort = Integer.parseInt(feHttpAddress.split(":")[1])
+
+    // Prepare a single-replica table so publish blocking deterministically 
drives the visible timeout path.
+    sql """ DROP TABLE IF EXISTS ${tableName} FORCE """
+    sql """
+        CREATE TABLE ${tableName} (
+            `k1` INT,
+            `k2` INT
+        )
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    try {
+        // Block FE publish so inserts can commit but remain non-visible until 
the debug point is removed.
+        DebugPoint.enableDebugPoint(feHost, feHttpPort, NodeType.FE, 
debugPoint)
+
+        sql """ SET insert_visible_timeout_ms = 1000 """
+
+        // Verify the default committed mode returns success after the visible 
wait times out.
+        sql """ SET insert_visible_timeout_return_mode = 'committed' """
+        sql """ INSERT INTO ${tableName} VALUES (1, 10) """
+
+        // Verify the error mode returns the publish-timeout error to the 
client while keeping the txn committed.
+        sql """ SET insert_visible_timeout_return_mode = 'error' """
+        test {
+            sql """ INSERT INTO ${tableName} VALUES (2, 20) """
+            exception "transaction commit successfully, BUT data did not 
become visible within insert_visible_timeout_ms and will be visible later."
+        }
+    } finally {
+        try {
+            DebugPoint.disableDebugPoint(feHost, feHttpPort, NodeType.FE, 
debugPoint)
+        } catch (Throwable e) {
+            logger.warn("Failed to disable debug point ${debugPoint}", e)
+        }
+    }
+
+    // Wait for FE publish to resume so both committed transactions become 
visible before checking final data.
+    def visible = false
+    for (int i = 0; i < 15; i++) {
+        def rowCount = sql """ SELECT COUNT(*) FROM ${tableName} """
+        if ((rowCount[0][0] as long) == 2L) {
+            visible = true
+            break
+        }
+        sleep(1000)
+    }
+    assertTrue(visible, "Rows should become visible after publish resumes")
+
+    order_qt_final_select """ SELECT * FROM ${tableName} ORDER BY k1 """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to