Module: kamailio
Branch: master
Commit: 741bb148ddf4311679cfa6e379fa8bdbc8fac5e5
URL: 
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5

Author: Stefan Mititelu <stefan.mitit...@net2phone.com>
Committer: Daniel-Constantin Mierla <mico...@gmail.com>
Date: 2024-09-18T10:01:10+02:00

kafka: add modparam init_without_kafka

---

Modified: src/modules/kafka/kafka_mod.c
Modified: src/modules/kafka/kfk.c

---

Diff:  
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5.diff
Patch: 
https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5.patch

---

diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c
index 26369d6ca31..e88bd37384b 100644
--- a/src/modules/kafka/kafka_mod.c
+++ b/src/modules/kafka/kafka_mod.c
@@ -64,6 +64,8 @@ static int w_kafka_send_key(
 /*
  * Variables and functions to deal with module parameters.
  */
+int child_init_ok = 0;
+int init_without_kafka = 0;
 char *brokers_param = NULL; /**< List of brokers. */
 static int kafka_conf_param(modparam_t type, void *val);
 static int kafka_topic_param(modparam_t type, void *val);
@@ -84,7 +86,7 @@ static param_export_t params[] = {{"brokers", PARAM_STRING, 
&brokers_param},
                {"configuration", PARAM_STRING | USE_FUNC_PARAM,
                                (void *)kafka_conf_param},
                {"topic", PARAM_STRING | USE_FUNC_PARAM, (void 
*)kafka_topic_param},
-               {0, 0, 0}};
+               {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 
0}};
 
 /**
  * \brief Kafka :: Module interface
@@ -125,9 +127,15 @@ static int child_init(int rank)
        if(rank == PROC_INIT || rank == PROC_TCP_MAIN)
                return 0;
 
+       child_init_ok = 1;
        if(kfk_init(brokers_param)) {
-               LM_ERR("Failed to initialize Kafka\n");
-               return -1;
+               child_init_ok = 0;
+               if(init_without_kafka) {
+                       LM_ERR("Failed to initialize Kafka - continue\n");
+               } else {
+                       LM_ERR("Failed to initialize Kafka\n");
+                       return -1;
+               }
        }
        return 0;
 }
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c
index c6dc8398da2..76a5fea75d9 100644
--- a/src/modules/kafka/kfk.c
+++ b/src/modules/kafka/kfk.c
@@ -36,6 +36,9 @@
 #include "../../core/mem/shm_mem.h"
 #include "../../core/locking.h"
 
+extern int child_init_ok;
+extern int init_without_kafka;
+
 /**
  * \brief data type for a configuration property.
  */
@@ -587,7 +590,9 @@ static int kfk_topic_configure(kfk_topic_t *ktopic)
        }
 
        int topic_found = kfk_topic_exist(ktopic->topic_name);
-       if(topic_found == -1) {
+       if(init_without_kafka) {
+               ;
+       } else if(topic_found == -1) {
                LM_ERR("Failed to search for topic %.*s in cluster\n",
                                ktopic->topic_name->len, ktopic->topic_name->s);
                goto error;
@@ -828,6 +833,12 @@ int kfk_message_send(str *topic_name, str *message, str 
*key)
        /* Get topic from name. */
        rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
 
+       if(!child_init_ok) {
+               LM_ERR("kafka module is unusable: child init NOT ok! Skip 
sending "
+                          "message, message lost!");
+               return -1;
+       }
+
        if(!rkt) {
                LM_ERR("Topic not found: %.*s\n", topic_name->len, 
topic_name->s);
                return -1;

_______________________________________________
Kamailio (SER) - Development Mailing List
To unsubscribe send an email to sr-dev-le...@lists.kamailio.org

Reply via email to