This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 802ba712ac9 [fix](third party) fix hang when destroy of rdkafka 
instances (#44913)
802ba712ac9 is described below

commit 802ba712ac9a8190539356bbfd294e103199e40b
Author: hui lai <lai...@selectdb.com>
AuthorDate: Tue Dec 3 21:34:49 2024 +0800

    [fix](third party) fix hang when destroy of rdkafka instances (#44913)
    
    Related PR: https://github.com/confluentinc/librdkafka/pull/4724
---
 thirdparty/patches/librdkafka-1.9.2.patch | 111 +++++++++++++++++++++++++++++-
 1 file changed, 110 insertions(+), 1 deletion(-)

diff --git a/thirdparty/patches/librdkafka-1.9.2.patch 
b/thirdparty/patches/librdkafka-1.9.2.patch
index b13e740bc5c..3caac08f79d 100644
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ b/thirdparty/patches/librdkafka-1.9.2.patch
@@ -67,7 +67,19 @@
  
 --- src/rdkafka_broker.c
 +++ src/rdkafka_broker.c
-@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
+@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, 
rd_kafka_op_t *rko) {
+                                 : (topic_err
+                                        ? topic_err
+                                        : 
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
++
++                        if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
++                                
rd_kafka_toppar_purge_internal_fetch_queue_maybe(
++                                    rktp);
++                        }
+                 }
+ 
+                 rd_kafka_toppar_unlock(rktp);
+@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
   */
  void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
  
@@ -78,3 +90,100 @@
          rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
          rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
          rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
+--- src/rdkafka_cgrp.c
++++ src/rdkafka_cgrp.c
+@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t 
*rkcg,
+         rd_kafka_toppar_lock(rktp);
+         rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
+         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
++
++        rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
++
+         rd_kafka_toppar_unlock(rktp);
+ 
+         rd_list_remove(&rkcg->rkcg_toppars, rktp);
+--- src/rdkafka_partition.c
++++ src/rdkafka_partition.c
+@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
+         rd_kafka_toppar_unlock(rktp);
+ }
+ 
++/**
++ * @brief Purge internal fetch queue if toppar is stopped
++ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
++ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
++ * removed starting from a metadata response and stopped from a rebalance or a
++ * consumer close.
++ *
++ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
++ * toppar that stop destroying a consumer.
++ *
++ * @locks rd_kafka_toppar_lock() MUST be held
++ */
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
*rktp) {
++        rd_kafka_q_t *rkq;
++        rkq = rktp->rktp_fetchq;
++        mtx_lock(&rkq->rkq_lock);
++        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
++            !rktp->rktp_fetchq->rkq_fwdq) {
++                rd_kafka_op_t *rko;
++                int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
++
++                /* Partition is being removed from the cluster and it's 
stopped,
++                 * so rktp->rktp_fetchq->rkq_fwdq is NULL.
++                 * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
++                 * while holding lock, to avoid circular references */
++                rko = TAILQ_FIRST(&rkq->rkq_q);
++                while (rko) {
++                        if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
++                            rko->rko_type != RD_KAFKA_OP_FETCH) {
++                                rd_kafka_log(
++                                    rktp->rktp_rkt->rkt_rk, LOG_WARNING,
++                                    "PARTDEL",
++                                    "Purging toppar fetch queue buffer op"
++                                    "with unexpected type: %s",
++                                    rd_kafka_op2str(rko->rko_type));
++                        }
++
++                        if (rko->rko_type == RD_KAFKA_OP_BARRIER)
++                                barrier_cnt++;
++                        else if (rko->rko_type == RD_KAFKA_OP_FETCH)
++                                message_cnt++;
++                        else
++                                other_cnt++;
+ 
++                        rko = TAILQ_NEXT(rko, rko_link);
++                        cnt++;
++                }
++
++                if (cnt) {
++                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++                                     "Purge toppar fetch queue buffer "
++                                     "containing %d op(s) "
++                                     "(%d barrier(s), %d message(s), %d 
other)"
++                                     " to avoid "
++                                     "circular references",
++                                     cnt, barrier_cnt, message_cnt, 
other_cnt);
++                        rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
++                } else {
++                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++                                     "Not purging toppar fetch queue buffer."
++                                     " No ops present in the buffer.");
++                }
++        }
++        mtx_unlock(&rkq->rkq_lock);
++}
+ 
+ /**
+  * Helper method for purging queues when removing a toppar.
+--- src/rdkafka_partition.h
++++ src/rdkafka_partition.h
+@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t 
*rktp,
+                                     int64_t query_offset,
+                                     int backoff_ms);
+ 
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
*rktp);
++
+ int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
+                                  int purge_flags,
+                                  rd_bool_t include_xmit_msgq);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to