This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cea2f61ec47 [chore](routine-load) optimize out of range error message 
(#36450)
cea2f61ec47 is described below

commit cea2f61ec47c71606c0da6a0102125565eb6a075
Author: hui lai <1353307...@qq.com>
AuthorDate: Sat Jun 22 07:59:27 2024 +0800

    [chore](routine-load) optimize out of range error message (#36450)
    
    ## Proposed changes
    
    before
    ```
    ErrorReason{code=errCode = 105, msg='be 10002 abort task, task id: 
d846f3d3-7c9e-44a7-bee0-3eff8cd11c6f job id: 11310 with reason: 
[INTERNAL_ERROR]Offset out of range,
    
            0#  doris::Status doris::Status::Error<6, 
true>(std::basic_string_view<char, std::char_traits<char> >) at 
/mnt/disk1/laihui/doris/be/src/common/status.h:422
            1#  doris::Status 
doris::Status::InternalError<true>(std::basic_string_view<char, 
std::char_traits<char> >) at /mnt/disk1/laihui/doris/be/src/common/status.h:468
            2#  
doris::KafkaDataConsumer::group_consume(doris::BlockingQueue<RdKafka::Message*>*,
 long) at 
/mnt/disk1/laihui/doris/be/src/runtime/routine_load/data_consumer.cpp:226
            3#  
doris::KafkaDataConsumerGroup::actual_consume(std::shared_ptr<doris::DataConsumer>,
 doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void 
(doris::Status const&)>) at 
/mnt/disk1/laihui/doris/be/src/runtime/routine_load/data_consumer_group.cpp:200
            4#  void std::__invoke_impl<void, void 
(doris::KafkaDataConsumerGroup::*&)(std::shared_ptr<doris::DataConsumer>, 
doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void 
(doris::Status const&)>), doris::KafkaDataConsumerGroup*&, 
std::shared_ptr<doris::DataConsumer>&, 
doris::BlockingQueue<RdKafka::Message*>*&, long&, 
doris::KafkaDataConsumerGroup::start_all(std::shared_ptr<doris::StreamLoadContext>,
 std::shared_ptr<doris::io::KafkaConsumerPipe>)::$_0&>(std::__invoke_m [...]
    ...
    ```
    
    now
    ```
    ErrorReason{code=errCode = 105, msg='be 10002 abort task, task id: 
3ba0c0f4-d13c-4dfa-90ce-3df922fd9340 job id: 11310 with reason: 
[INTERNAL_ERROR]Offset out of range, consume partition 0, consume offset 100, 
the offset used by job does not exist in kafka, please check the offset, using 
the Alter ROUTINE LOAD command to modify it, and resume the job'}
    ```
---
 be/src/runtime/routine_load/data_consumer.cpp      |  22 +++-
 .../routine_load/data/test_out_of_range.csv        |   1 +
 .../routine_load/test_out_of_range_error.groovy    | 112 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 9c40e85e281..d185a610af7 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -36,6 +36,7 @@
 #include "runtime/small_file_mgr.h"
 #include "service/backend_options.h"
 #include "util/blocking_queue.hpp"
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 #include "util/stopwatch.hpp"
 #include "util/string_util.h"
@@ -218,6 +219,16 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
         consumer_watch.start();
         std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* 
timeout, ms */));
         consumer_watch.stop();
+        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
+            done = true;
+            std::stringstream ss;
+            ss << "Offset out of range"
+               << ", consume partition " << msg->partition() << ", consume 
offset "
+               << msg->offset();
+            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
ss.str();
+            st = Status::InternalError<false>(ss.str());
+            break;
+        });
         switch (msg->err()) {
         case RdKafka::ERR_NO_ERROR:
             if (_consuming_partition_ids.count(msg->partition()) <= 0) {
@@ -258,10 +269,19 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
             }
             break;
         }
+        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
+            done = true;
+            std::stringstream ss;
+            ss << msg->errstr() << ", consume partition " << msg->partition() 
<< ", consume offset "
+               << msg->offset();
+            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
ss.str();
+            st = Status::InternalError<false>(ss.str());
+            break;
+        }
         default:
             LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
msg->errstr();
             done = true;
-            st = Status::InternalError(msg->errstr());
+            st = Status::InternalError<false>(msg->errstr());
             break;
         }
 
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv 
b/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv
new file mode 100644
index 00000000000..e2ad18c3878
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_out_of_range.csv
@@ -0,0 +1 @@
+routine_load_dup_tbl_basic_multi_table|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
new file mode 100644
index 00000000000..1ae74b73301
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_out_of_range","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "test_out_of_range",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    def load_with_injection = { injection ->
+        def jobName = "test_out_of_range_eror"
+        def tableName = "dup_tbl_basic_multi_table"
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                sql new 
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+                sql new 
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+                sql "sync"
+
+                sql """
+                    CREATE ROUTINE LOAD ${jobName}
+                    COLUMNS TERMINATED BY "|"
+                    PROPERTIES
+                    (
+                        "max_batch_interval" = "5",
+                        "max_batch_rows" = "300000",
+                        "max_batch_size" = "209715200"
+                    )
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                        "kafka_topic" = "test_out_of_range",
+                        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    );
+                """
+                sql "sync"
+
+                def count = 0
+                while (true) {
+                    sleep(1000)
+                    def res = sql "show routine load for ${jobName}"
+                    def state = res[0][8].toString()
+                    if (state == "PAUSED") {
+                        log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                        assertTrue(res[0][17].toString().contains("Offset out 
of range"))
+                        assertTrue(res[0][17].toString().contains("consume 
partition"))
+                        assertTrue(res[0][17].toString().contains("consume 
offset"))
+                        GetDebugPoint().disableDebugPointForAllBEs(injection)
+                        break;
+                    }
+                    count++
+                    if (count > 60) {
+                        GetDebugPoint().disableDebugPointForAllBEs(injection)
+                        assertEquals(1, 2)
+                        break;
+                    } else {
+                        continue;
+                    }
+                }
+
+            } finally {
+                sql "stop routine load for ${jobName}"
+                sql "DROP TABLE IF EXISTS ${tableName}"
+            }
+        }
+    }
+
+    load_with_injection("KafkaDataConsumer.group_consume.out_of_range")
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to