This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0b52b7ee7a2 [opt](routine load) support routine load perceived schema 
change (#39412)
0b52b7ee7a2 is described below

commit 0b52b7ee7a219d7750295ef7b17c32cdc7ee26a8
Author: hui lai <1353307...@qq.com>
AuthorDate: Tue Aug 27 20:29:44 2024 +0800

    [opt](routine load) support routine load perceived schema change (#39412)
    
    At present, if the table structure changes, the routine load cannot
    perceive it. As a long-running load, it should be able to perceive the
    changes in the table structure.
---
 .../load/routineload/KafkaRoutineLoadJob.java      |   1 -
 .../doris/load/routineload/KafkaTaskInfo.java      |   8 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  19 +--
 .../routine_load/test_routine_load_with_sc.out     |   4 +
 .../data/test_routine_load_with_sc.csv             |   1 +
 .../routine_load/test_routine_load_with_sc.groovy  | 149 +++++++++++++++++++++
 6 files changed, 163 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index abd1800a19d..1762f8d1122 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -174,7 +174,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     @Override
     public void prepare() throws UserException {
-        super.prepare();
         // should reset converted properties each time the job being prepared.
         // because the file info can be changed anytime.
         convertCustomProperties(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index dedb66d5f94..52a1ad8559f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -19,10 +19,12 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TKafkaLoadInfo;
 import org.apache.doris.thrift.TLoadSourceType;
@@ -127,7 +129,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob) 
throws UserException {
         TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
         // plan for each task, in case table has change(rollup or schema 
change)
-        TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(loadId, txnId);
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
+        StreamLoadPlanner planner = new StreamLoadPlanner(db,
+                (OlapTable) 
db.getTableOrMetaException(routineLoadJob.getTableId(),
+                Table.TableType.OLAP), routineLoadJob);
+        TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(planner, loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
         // it needs update timeout to make task timeout backoff work
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 2b8cbbd81ac..b983d6beed4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -262,9 +262,6 @@ public abstract class RoutineLoadJob
     // The tasks belong to this job
     protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Lists.newArrayList();
 
-    // stream load planer will be initialized during job schedule
-    protected StreamLoadPlanner planner;
-
     // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist 
the RoutineLoadJob,
     // because we can not serialize the Expressions contained in job.
     @SerializedName("ostmt")
@@ -967,21 +964,9 @@ public abstract class RoutineLoadJob
 
     // call before first scheduling
     // derived class can override this.
-    public void prepare() throws UserException {
-        initPlanner();
-    }
-
-    private void initPlanner() throws UserException {
-        // for multi table load job, the table name is dynamic,we will set 
table when task scheduling.
-        if (isMultiTable) {
-            return;
-        }
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
-        planner = new StreamLoadPlanner(db,
-                (OlapTable) db.getTableOrMetaException(this.tableId, 
Table.TableType.OLAP), this);
-    }
+    public abstract void prepare() throws UserException;
 
-    public TPipelineFragmentParams plan(TUniqueId loadId, long txnId) throws 
UserException {
+    public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId 
loadId, long txnId) throws UserException {
         Preconditions.checkNotNull(planner);
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         Table table = db.getTableOrMetaException(tableId, 
Table.TableType.OLAP);
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out
new file mode 100644
index 00000000000..974fdb75559
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_with_sc --
+1      eab     2023-07-15      def     2023-07-20T05:48:31     aaaaaaaa
+
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
new file mode 100644
index 00000000000..6434a1b032f
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,aaaaaaaa
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
new file mode 100644
index 00000000000..33c047062dd
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_with_sc","p0") {
+    def kafkaCsvTpoics = [
+                  "test_routine_load_with_sc",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    def jobName = "test_routine_load_with_sc_job"
+    def tableName = "test_routine_load_with_sc"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            sql """ DROP TABLE IF EXISTS ${tableName}"""
+            sql """ CREATE TABLE IF NOT EXISTS ${tableName}
+                    (
+                        `k1` int(20) NULL,
+                        `k2` string NULL,
+                        `v1` date  NULL,
+                        `v2` string  NULL,
+                        `v3` datetime  NULL,
+                        `v4` varchar(5)  NULL
+                    ) ENGINE=OLAP
+                    DUPLICATE KEY(`k1`)
+                    COMMENT 'OLAP'
+                    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+                    PROPERTIES ("replication_allocation" = 
"tag.location.default: 1");
+                """
+            sql "sync"
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                if (state != "PAUSED") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                break;
+            }
+
+            sql "ALTER TABLE ${tableName} MODIFY COLUMN v4 VARCHAR(10)"
+            sql "resume routine load for ${jobName}"
+
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            def producer = new KafkaProducer<>(props)
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                def lines = txt.readLines()
+                lines.each { line ->
+                    logger.info("=====${line}========")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                    producer.send(record)
+                }
+            }
+
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+            qt_sql_with_sc "select * from ${tableName} order by k1"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to