This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new 802ba712ac9 [fix](third party) fix hang when destroy of rdkafka 
instances (#44913)
802ba712ac9 is described below

commit 802ba712ac9a8190539356bbfd294e103199e40b
Author: hui lai <>
AuthorDate: Tue Dec 3 21:34:49 2024 +0800

    [fix](third party) fix hang when destroy of rdkafka instances (#44913)
    Related PR:
 thirdparty/patches/librdkafka-1.9.2.patch | 111 +++++++++++++++++++++++++++++-
 1 file changed, 110 insertions(+), 1 deletion(-)

diff --git a/thirdparty/patches/librdkafka-1.9.2.patch 
index b13e740bc5c..3caac08f79d 100644
--- a/thirdparty/patches/librdkafka-1.9.2.patch
+++ b/thirdparty/patches/librdkafka-1.9.2.patch
@@ -67,7 +67,19 @@
 --- src/rdkafka_broker.c
 +++ src/rdkafka_broker.c
-@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
+@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, 
rd_kafka_op_t *rko) {
+                                 : (topic_err
+                                        ? topic_err
+                                        : 
++                        if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
++                                    rktp);
++                        }
+                 }
+                 rd_kafka_toppar_unlock(rktp);
+@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
  void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
@@ -78,3 +90,100 @@
+--- src/rdkafka_cgrp.c
++++ src/rdkafka_cgrp.c
+@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t 
+         rd_kafka_toppar_lock(rktp);
+         rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
+         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
++        rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
+         rd_kafka_toppar_unlock(rktp);
+         rd_list_remove(&rkcg->rkcg_toppars, rktp);
+--- src/rdkafka_partition.c
++++ src/rdkafka_partition.c
+@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
+         rd_kafka_toppar_unlock(rktp);
+ }
++ * @brief Purge internal fetch queue if toppar is stopped
++ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
++ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
++ * removed starting from a metadata response and stopped from a rebalance or a
++ * consumer close.
++ *
++ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
++ * toppar that stop destroying a consumer.
++ *
++ * @locks rd_kafka_toppar_lock() MUST be held
++ */
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
*rktp) {
++        rd_kafka_q_t *rkq;
++        rkq = rktp->rktp_fetchq;
++        mtx_lock(&rkq->rkq_lock);
++        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
++            !rktp->rktp_fetchq->rkq_fwdq) {
++                rd_kafka_op_t *rko;
++                int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
++                /* Partition is being removed from the cluster and it's 
++                 * so rktp->rktp_fetchq->rkq_fwdq is NULL.
++                 * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
++                 * while holding lock, to avoid circular references */
++                rko = TAILQ_FIRST(&rkq->rkq_q);
++                while (rko) {
++                        if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
++                            rko->rko_type != RD_KAFKA_OP_FETCH) {
++                                rd_kafka_log(
++                                    rktp->rktp_rkt->rkt_rk, LOG_WARNING,
++                                    "PARTDEL",
++                                    "Purging toppar fetch queue buffer op"
++                                    "with unexpected type: %s",
++                                    rd_kafka_op2str(rko->rko_type));
++                        }
++                        if (rko->rko_type == RD_KAFKA_OP_BARRIER)
++                                barrier_cnt++;
++                        else if (rko->rko_type == RD_KAFKA_OP_FETCH)
++                                message_cnt++;
++                        else
++                                other_cnt++;
++                        rko = TAILQ_NEXT(rko, rko_link);
++                        cnt++;
++                }
++                if (cnt) {
++                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++                                     "Purge toppar fetch queue buffer "
++                                     "containing %d op(s) "
++                                     "(%d barrier(s), %d message(s), %d 
++                                     " to avoid "
++                                     "circular references",
++                                     cnt, barrier_cnt, message_cnt, 
++                        rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
++                } else {
++                        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
++                                     "Not purging toppar fetch queue buffer."
++                                     " No ops present in the buffer.");
++                }
++        }
++        mtx_unlock(&rkq->rkq_lock);
+ /**
+  * Helper method for purging queues when removing a toppar.
+--- src/rdkafka_partition.h
++++ src/rdkafka_partition.h
+@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t 
+                                     int64_t query_offset,
+                                     int backoff_ms);
++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t 
+ int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
+                                  int purge_flags,
+                                  rd_bool_t include_xmit_msgq);

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to