This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 057767f2bbd [chore](routine-load) optimize out of range error message 
(#36450) (#37456)
057767f2bbd is described below

commit 057767f2bbddf03552d33ad4da54d06070ec43b8
Author: hui lai <[email protected]>
AuthorDate: Thu Jul 11 10:52:01 2024 +0800

    [chore](routine-load) optimize out of range error message (#36450) (#37456)
    
    pick #36450
---
 be/src/runtime/routine_load/data_consumer.cpp | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index ccf5fb4cb25..2874a43ae0e 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -36,6 +36,7 @@
 #include "runtime/small_file_mgr.h"
 #include "service/backend_options.h"
 #include "util/blocking_queue.hpp"
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 #include "util/stopwatch.hpp"
 #include "util/string_util.h"
@@ -218,6 +219,16 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
         consumer_watch.start();
         std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* 
timeout, ms */));
         consumer_watch.stop();
+        DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
+            done = true;
+            std::stringstream ss;
+            ss << "Offset out of range"
+               << ", consume partition " << msg->partition() << ", consume 
offset "
+               << msg->offset();
+            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
ss.str();
+            st = Status::InternalError<false>(ss.str());
+            break;
+        });
         switch (msg->err()) {
         case RdKafka::ERR_NO_ERROR:
             if (_consuming_partition_ids.count(msg->partition()) <= 0) {
@@ -249,6 +260,15 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
                 break;
             }
             [[fallthrough]];
+        case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
+            done = true;
+            std::stringstream ss;
+            ss << msg->errstr() << ", consume partition " << msg->partition() 
<< ", consume offset "
+               << msg->offset();
+            LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
ss.str();
+            st = Status::InternalError<false>(ss.str());
+            break;
+        }
         case RdKafka::ERR__PARTITION_EOF: {
             LOG(INFO) << "consumer meet partition eof: " << _id
                       << " partition offset: " << msg->offset();
@@ -261,7 +281,7 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
         default:
             LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << 
msg->errstr();
             done = true;
-            st = Status::InternalError(msg->errstr());
+            st = Status::InternalError<false>(msg->errstr());
             break;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to