Module: kamailio Branch: master Commit: 741bb148ddf4311679cfa6e379fa8bdbc8fac5e5 URL: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5
Author: Stefan Mititelu <stefan.mitit...@net2phone.com> Committer: Daniel-Constantin Mierla <mico...@gmail.com> Date: 2024-09-18T10:01:10+02:00 kafka: add modparam init_without_kafka --- Modified: src/modules/kafka/kafka_mod.c Modified: src/modules/kafka/kfk.c --- Diff: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5.diff Patch: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdbc8fac5e5.patch --- diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c index 26369d6ca31..e88bd37384b 100644 --- a/src/modules/kafka/kafka_mod.c +++ b/src/modules/kafka/kafka_mod.c @@ -64,6 +64,8 @@ static int w_kafka_send_key( /* * Variables and functions to deal with module parameters. */ +int child_init_ok = 0; +int init_without_kafka = 0; char *brokers_param = NULL; /**< List of brokers. */ static int kafka_conf_param(modparam_t type, void *val); static int kafka_topic_param(modparam_t type, void *val); @@ -84,7 +86,7 @@ static param_export_t params[] = {{"brokers", PARAM_STRING, &brokers_param}, {"configuration", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_conf_param}, {"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param}, - {0, 0, 0}}; + {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 0}}; /** * \brief Kafka :: Module interface @@ -125,9 +127,15 @@ static int child_init(int rank) if(rank == PROC_INIT || rank == PROC_TCP_MAIN) return 0; + child_init_ok = 1; if(kfk_init(brokers_param)) { - LM_ERR("Failed to initialize Kafka\n"); - return -1; + child_init_ok = 0; + if(init_without_kafka) { + LM_ERR("Failed to initialize Kafka - continue\n"); + } else { + LM_ERR("Failed to initialize Kafka\n"); + return -1; + } } return 0; } diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index c6dc8398da2..76a5fea75d9 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -36,6 +36,9 @@ #include "../../core/mem/shm_mem.h" #include "../../core/locking.h" +extern int child_init_ok; +extern int init_without_kafka; + /** * \brief data type for a configuration property. */ @@ -587,7 +590,9 @@ static int kfk_topic_configure(kfk_topic_t *ktopic) } int topic_found = kfk_topic_exist(ktopic->topic_name); - if(topic_found == -1) { + if(init_without_kafka) { + ; + } else if(topic_found == -1) { LM_ERR("Failed to search for topic %.*s in cluster\n", ktopic->topic_name->len, ktopic->topic_name->s); goto error; @@ -828,6 +833,12 @@ int kfk_message_send(str *topic_name, str *message, str *key) /* Get topic from name. */ rd_kafka_topic_t *rkt = kfk_topic_get(topic_name); + if(!child_init_ok) { + LM_ERR("kafka module is unusable: child init NOT ok! Skip sending " + "message, message lost!"); + return -1; + } + if(!rkt) { LM_ERR("Topic not found: %.*s\n", topic_name->len, topic_name->s); return -1; _______________________________________________ Kamailio (SER) - Development Mailing List To unsubscribe send an email to sr-dev-le...@lists.kamailio.org