Module: kamailio
Branch: master
Commit: 5b852254e75365c2886baf1e4be8829ac10b9917
URL: 
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829ac10b9917

Author: Daniel-Constantin Mierla <mico...@gmail.com>
Committer: Daniel-Constantin Mierla <mico...@gmail.com>
Date: 2024-11-22T13:28:20+01:00

core: framework to execute async event route with type-key-value

- execute event_route[core:tkv] in async fashion, with data passed in
  the form of [type, key, value]
- the events have to be emitted from parts of the code and the execution
  of the event route should not block the emitting process

---

Modified: src/core/async_task.c
Modified: src/core/async_task.h

---

Diff:  
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829ac10b9917.diff
Patch: 
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829ac10b9917.patch

---

diff --git a/src/core/async_task.c b/src/core/async_task.c
index bce7ed51faa..17c385a51fd 100644
--- a/src/core/async_task.c
+++ b/src/core/async_task.c
@@ -34,11 +34,15 @@
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <errno.h>
+#include <stdarg.h>
 
 #include "dprint.h"
 #include "sr_module.h"
 #include "ut.h"
 #include "pt.h"
+#include "kemi.h"
+#include "fmsg.h"
+#include "receive.h"
 #include "cfg/cfg_struct.h"
 #include "parser/parse_param.h"
 
@@ -525,3 +529,126 @@ int async_task_run(async_wgroup_t *awg, int idx)
 
        return 0;
 }
+
+
+/**
+ *
+ */
+static async_wgroup_t *_async_tkv_awg = NULL;
+static async_tkv_param_t *_ksr_async_tkv_param = NULL;
+static int _ksr_async_tkv_ridx = -1;
+
+/**
+ *
+ */
+void async_tkv_init(void)
+{
+       str gname = str_init("tkv");
+       str evname = str_init("core:tkv");
+
+       _async_tkv_awg = async_task_group_find(&gname);
+
+       _ksr_async_tkv_ridx = route_lookup(&event_rt, evname.s);
+       if(_ksr_async_tkv_ridx <= 0
+                       || event_rt.rlist[_ksr_async_tkv_ridx] == NULL) {
+               LM_DBG("event_route[%s] not defined - skipping\n", evname.s);
+               _ksr_async_tkv_ridx = -2;
+               return;
+       }
+}
+
+/**
+ *
+ */
+void async_exec_tkv(void *param)
+{
+       async_tkv_param_t *adp;
+       sr_kemi_eng_t *keng = NULL;
+       sip_msg_t *fmsg = NULL;
+       str evname = str_init("core:tkv");
+       str cbname = str_init("ksr_core_tkv");
+       int rtype = 0;
+
+       adp = (async_tkv_param_t *)param;
+       fmsg = faked_msg_next();
+       rtype = get_route_type();
+       _ksr_async_tkv_param = adp;
+       set_route_type(REQUEST_ROUTE);
+       keng = sr_kemi_eng_get();
+       if(keng != NULL) {
+               if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname) < 
0) {
+                       LM_ERR("error running event route kemi callback 
[%.*s]\n",
+                                       cbname.len, cbname.s);
+               }
+       } else {
+               if(_ksr_async_tkv_ridx >= 0) {
+                       run_top_route(event_rt.rlist[_ksr_async_tkv_ridx], 
fmsg, 0);
+               }
+       }
+       ksr_msg_env_reset();
+       set_route_type(rtype);
+       _ksr_async_tkv_param = NULL;
+       /* param is freed along with the async task strucutre in core */
+}
+
+/**
+ *
+ */
+int async_tkv_emit(int dtype, char *pkey, char *fmt, ...)
+{
+       async_task_t *at = NULL;
+       int dsize = 0;
+       async_tkv_param_t *adp;
+       va_list va;
+       char buf[KSR_ASYNC_TKV_SIZE];
+       int n = 0;
+       int klen = 0;
+       sr_kemi_eng_t *keng = NULL;
+
+       if(_async_tkv_awg == NULL) {
+               LM_DBG("the async group has not been set\n");
+               return -1;
+       }
+       keng = sr_kemi_eng_get();
+       if(keng == NULL && _ksr_async_tkv_ridx == -2) {
+               LM_DBG("the event_route[core:tkv] has not been defined\n");
+               return -1;
+       }
+
+       va_start(va, fmt);
+       n = vsnprintf(buf, KSR_ASYNC_TKV_SIZE, fmt, va);
+       va_end(va);
+
+       if(n < 0 || n > KSR_ASYNC_TKV_SIZE) {
+               LM_ERR("failed to print the arguments for key: %s\n", pkey);
+               return -1;
+       }
+
+       klen = strlen(pkey);
+       dsize = sizeof(async_task_t) + sizeof(async_tkv_param_t) + klen + 1 + n 
+ 1;
+       at = (async_task_t *)shm_malloc(dsize);
+       if(at == NULL) {
+               SHM_MEM_ERROR;
+               return -1;
+       }
+       memset(at, 0, dsize);
+       at->exec = async_exec_tkv;
+       at->param = (char *)at + sizeof(async_task_t);
+       adp = (async_tkv_param_t *)at->param;
+       adp->dtype = dtype;
+       adp->skey.s = (char *)adp + sizeof(async_tkv_param_t);
+       adp->skey.len = klen;
+       memcpy(adp->skey.s, pkey, klen);
+       adp->skey.len = klen;
+       adp->sval.s = (char *)adp->skey.s + klen + 1;
+       memcpy(adp->sval.s, buf, n);
+       adp->sval.len = n;
+
+       if(async_task_group_send(_async_tkv_awg, at) < 0) {
+               LM_ERR("failed to send task with key: %s\n", pkey);
+               shm_free(at);
+               return -1;
+       }
+
+       return 0;
+}
diff --git a/src/core/async_task.h b/src/core/async_task.h
index 58386bba8bd..b4276c2d27c 100644
--- a/src/core/async_task.h
+++ b/src/core/async_task.h
@@ -58,4 +58,15 @@ async_wgroup_t *async_task_group_find(str *gname);
 int async_task_group_push(str *gname, async_task_t *task);
 int async_task_group_send(async_wgroup_t *awg, async_task_t *task);
 
+typedef struct async_tkv_param
+{
+       int dtype;
+       str skey;
+       str sval;
+} async_tkv_param_t;
+
+#define KSR_ASYNC_TKV_SIZE 512
+void async_tkv_init(void);
+int async_tkv_emit(int dtype, char *pkey, char *fmt, ...);
+
 #endif

_______________________________________________
Kamailio - Development Mailing List -- sr-dev@lists.kamailio.org
To unsubscribe send an email to sr-dev-le...@lists.kamailio.org
Important: keep the mailing list in the recipients, do not reply only to the 
sender!

Reply via email to