This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2b4efc76fc0 [fix](routine-load) fix consumer hang when kafka exception 
causing can not query (#33492) (#33759)
2b4efc76fc0 is described below

commit 2b4efc76fc043dfbb43c5b06ad7b9d85937978e4
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Thu Apr 18 20:22:49 2024 +0800

    [fix](routine-load) fix consumer hang when kafka exception causing can not 
query (#33492) (#33759)
---
 thirdparty/patches/librdkafka-1.9.2.patch | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/thirdparty/patches/librdkafka-1.9.2.patch 
b/thirdparty/patches/librdkafka-1.9.2.patch
index 38064e751dc..b13e740bc5c 100644
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ b/thirdparty/patches/librdkafka-1.9.2.patch
@@ -34,6 +34,37 @@
          # Clear define name ($2): caller may have additional checks
          mkl_check_failed "$cname" "" "$3" "pkg-config --libs failed"
          return 1
+--- src/rdkafka.c
++++ src/rdkafka.c
+@@ -3510,6 +3510,7 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+         struct rd_kafka_partition_leader *leader;
+         rd_list_t leaders;
+         rd_kafka_resp_err_t err;
++        int tmout;
+ 
+         partitions = rd_kafka_topic_partition_list_new(1);
+         rktpar =
+@@ -3556,11 +3557,15 @@ rd_kafka_resp_err_t 
rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
+         rd_list_destroy(&leaders);
+ 
+         /* Wait for reply (or timeout) */
+-        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
+-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
+-                                rd_kafka_poll_cb,
+-                                NULL) != RD_KAFKA_OP_RES_YIELD)
+-                ;
++        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
++                tmout = rd_timeout_remains(ts_end);
++                if (rd_timeout_expired(tmout)) {
++                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
++                        break;
++                }
++                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
++                                 rd_kafka_poll_cb, NULL);
++        }
+ 
+         rd_kafka_q_destroy_owner(rkq);
+ 
 --- src/rdkafka_broker.c
 +++ src/rdkafka_broker.c
 @@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to