This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cea2f61ec47 [chore](routine-load) optimize out of range error message (#36450) cea2f61ec47 is described below commit cea2f61ec47c71606c0da6a0102125565eb6a075 Author: hui lai <1353307...@qq.com> AuthorDate: Sat Jun 22 07:59:27 2024 +0800 [chore](routine-load) optimize out of range error message (#36450) ## Proposed changes before ``` ErrorReason{code=errCode = 105, msg='be 10002 abort task, task id: d846f3d3-7c9e-44a7-bee0-3eff8cd11c6f job id: 11310 with reason: [INTERNAL_ERROR]Offset out of range, 0# doris::Status doris::Status::Error<6, true>(std::basic_string_view<char, std::char_traits<char> >) at /mnt/disk1/laihui/doris/be/src/common/status.h:422 1# doris::Status doris::Status::InternalError<true>(std::basic_string_view<char, std::char_traits<char> >) at /mnt/disk1/laihui/doris/be/src/common/status.h:468 2# doris::KafkaDataConsumer::group_consume(doris::BlockingQueue<RdKafka::Message*>*, long) at /mnt/disk1/laihui/doris/be/src/runtime/routine_load/data_consumer.cpp:226 3# doris::KafkaDataConsumerGroup::actual_consume(std::shared_ptr<doris::DataConsumer>, doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void (doris::Status const&)>) at /mnt/disk1/laihui/doris/be/src/runtime/routine_load/data_consumer_group.cpp:200 4# void std::__invoke_impl<void, void (doris::KafkaDataConsumerGroup::*&)(std::shared_ptr<doris::DataConsumer>, doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void (doris::Status const&)>), doris::KafkaDataConsumerGroup*&, std::shared_ptr<doris::DataConsumer>&, doris::BlockingQueue<RdKafka::Message*>*&, long&, doris::KafkaDataConsumerGroup::start_all(std::shared_ptr<doris::StreamLoadContext>, std::shared_ptr<doris::io::KafkaConsumerPipe>)::$_0&>(std::__invoke_m [...] ... ``` now ``` ErrorReason{code=errCode = 105, msg='be 10002 abort task, task id: 3ba0c0f4-d13c-4dfa-90ce-3df922fd9340 job id: 11310 with reason: [INTERNAL_ERROR]Offset out of range, consume partition 0, consume offset 100, the offset used by job does not exist in kafka, please check the offset, using the Alter ROUTINE LOAD command to modify it, and resume the job'} ``` --- be/src/runtime/routine_load/data_consumer.cpp | 22 +++- .../routine_load/data/test_out_of_range.csv | 1 + .../routine_load/test_out_of_range_error.groovy | 112 +++++++++++++++++++++ 3 files changed, 134 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 9c40e85e281..d185a610af7 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -36,6 +36,7 @@ #include "runtime/small_file_mgr.h" #include "service/backend_options.h" #include "util/blocking_queue.hpp" +#include "util/debug_points.h" #include "util/defer_op.h" #include "util/stopwatch.hpp" #include "util/string_util.h" @@ -218,6 +219,16 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, consumer_watch.start(); std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */)); consumer_watch.stop(); + DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", { + done = true; + std::stringstream ss; + ss << "Offset out of range" + << ", consume partition " << msg->partition() << ", consume offset " + << msg->offset(); + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); + st = Status::InternalError<false>(ss.str()); + break; + }); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: if (_consuming_partition_ids.count(msg->partition()) <= 0) { @@ -258,10 +269,19 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, } break; } + case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { + done = true; + std::stringstream ss; + ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset " + << msg->offset(); + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); + st = Status::InternalError<false>(ss.str()); + break; + } default: LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); done = true; - st = Status::InternalError(msg->errstr()); + st = Status::InternalError<false>(msg->errstr()); break; } diff --git a/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv b/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv new file mode 100644 index 00000000000..e2ad18c3878 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv @@ -0,0 +1 @@ +routine_load_dup_tbl_basic_multi_table|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]} diff --git a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy new file mode 100644 index 00000000000..1ae74b73301 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_out_of_range","nonConcurrent") { + def kafkaCsvTpoics = [ + "test_out_of_range", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def load_with_injection = { injection -> + def jobName = "test_out_of_range_eror" + def tableName = "dup_tbl_basic_multi_table" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_out_of_range", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "PAUSED") { + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertTrue(res[0][17].toString().contains("Offset out of range")) + assertTrue(res[0][17].toString().contains("consume partition")) + assertTrue(res[0][17].toString().contains("consume offset")) + GetDebugPoint().disableDebugPointForAllBEs(injection) + break; + } + count++ + if (count > 60) { + GetDebugPoint().disableDebugPointForAllBEs(injection) + assertEquals(1, 2) + break; + } else { + continue; + } + } + + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + } + + load_with_injection("KafkaDataConsumer.group_consume.out_of_range") +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org