This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 057767f2bbd [chore](routine-load) optimize out of range error message
(#36450) (#37456)
057767f2bbd is described below
commit 057767f2bbddf03552d33ad4da54d06070ec43b8
Author: hui lai <[email protected]>
AuthorDate: Thu Jul 11 10:52:01 2024 +0800
[chore](routine-load) optimize out of range error message (#36450) (#37456)
pick #36450
---
be/src/runtime/routine_load/data_consumer.cpp | 22 +++++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index ccf5fb4cb25..2874a43ae0e 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -36,6 +36,7 @@
#include "runtime/small_file_mgr.h"
#include "service/backend_options.h"
#include "util/blocking_queue.hpp"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
@@ -218,6 +219,16 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /*
timeout, ms */));
consumer_watch.stop();
+ DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
+ done = true;
+ std::stringstream ss;
+ ss << "Offset out of range"
+ << ", consume partition " << msg->partition() << ", consume
offset "
+ << msg->offset();
+ LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
ss.str();
+ st = Status::InternalError<false>(ss.str());
+ break;
+ });
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
@@ -249,6 +260,15 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
+ case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
+ done = true;
+ std::stringstream ss;
+ ss << msg->errstr() << ", consume partition " << msg->partition()
<< ", consume offset "
+ << msg->offset();
+ LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
ss.str();
+ st = Status::InternalError<false>(ss.str());
+ break;
+ }
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
@@ -261,7 +281,7 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " <<
msg->errstr();
done = true;
- st = Status::InternalError(msg->errstr());
+ st = Status::InternalError<false>(msg->errstr());
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]