This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0b52b7ee7a2 [opt](routine load) support routine load perceived schema change (#39412) 0b52b7ee7a2 is described below commit 0b52b7ee7a219d7750295ef7b17c32cdc7ee26a8 Author: hui lai <1353307...@qq.com> AuthorDate: Tue Aug 27 20:29:44 2024 +0800 [opt](routine load) support routine load perceived schema change (#39412) At present, if the table structure changes, the routine load cannot perceive it. As a long-running load, it should be able to perceive the changes in the table structure. --- .../load/routineload/KafkaRoutineLoadJob.java | 1 - .../doris/load/routineload/KafkaTaskInfo.java | 8 +- .../doris/load/routineload/RoutineLoadJob.java | 19 +-- .../routine_load/test_routine_load_with_sc.out | 4 + .../data/test_routine_load_with_sc.csv | 1 + .../routine_load/test_routine_load_with_sc.groovy | 149 +++++++++++++++++++++ 6 files changed, 163 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index abd1800a19d..1762f8d1122 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -174,7 +174,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { @Override public void prepare() throws UserException { - super.prepare(); // should reset converted properties each time the job being prepared. // because the file info can be changed anytime. convertCustomProperties(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index dedb66d5f94..52a1ad8559f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -19,10 +19,12 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TKafkaLoadInfo; import org.apache.doris.thrift.TLoadSourceType; @@ -127,7 +129,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws UserException { TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); // plan for each task, in case table has change(rollup or schema change) - TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId); + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId()); + StreamLoadPlanner planner = new StreamLoadPlanner(db, + (OlapTable) db.getTableOrMetaException(routineLoadJob.getTableId(), + Table.TableType.OLAP), routineLoadJob); + TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(planner, loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); // it needs update timeout to make task timeout backoff work diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 2b8cbbd81ac..b983d6beed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -262,9 +262,6 @@ public abstract class RoutineLoadJob // The tasks belong to this job protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList(); - // stream load planer will be initialized during job schedule - protected StreamLoadPlanner planner; - // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob, // because we can not serialize the Expressions contained in job. @SerializedName("ostmt") @@ -967,21 +964,9 @@ public abstract class RoutineLoadJob // call before first scheduling // derived class can override this. - public void prepare() throws UserException { - initPlanner(); - } - - private void initPlanner() throws UserException { - // for multi table load job, the table name is dynamic,we will set table when task scheduling. - if (isMultiTable) { - return; - } - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); - planner = new StreamLoadPlanner(db, - (OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this); - } + public abstract void prepare() throws UserException; - public TPipelineFragmentParams plan(TUniqueId loadId, long txnId) throws UserException { + public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId, long txnId) throws UserException { Preconditions.checkNotNull(planner); Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP); diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out new file mode 100644 index 00000000000..974fdb75559 --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_with_sc -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 aaaaaaaa + diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv new file mode 100644 index 00000000000..6434a1b032f --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,aaaaaaaa \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy new file mode 100644 index 00000000000..33c047062dd --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_with_sc","p0") { + def kafkaCsvTpoics = [ + "test_routine_load_with_sc", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def jobName = "test_routine_load_with_sc_job" + def tableName = "test_routine_load_with_sc" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ DROP TABLE IF EXISTS ${tableName}""" + sql """ CREATE TABLE IF NOT EXISTS ${tableName} + ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` varchar(5) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "PAUSED") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + break; + } + + sql "ALTER TABLE ${tableName} MODIFY COLUMN v4 VARCHAR(10)" + sql "resume routine load for ${jobName}" + + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_with_sc "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org