doleyzi commented on code in PR #10812:
URL: https://github.com/apache/inlong/pull/10812#discussion_r1721256553


##########
inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp:
##########
@@ -18,44 +18,62 @@
  */
 
 #include <pybind11/pybind11.h>
+#include <pybind11/functional.h>
 #include <pybind11/stl.h>
 #include <inlong_api.h>
+#include <atomic>
+#include <thread>
+#include <iostream>
 
 namespace py = pybind11;
-using namespace inlong;
 
-class PyInLongApi : public InLongApi {
-public:
-    int32_t PySend(const char *inlong_group_id, const char *inlong_stream_id, 
const char *msg, int32_t msg_len, py::function callback_func) {
-        py_callback = callback_func;
-        return Send(inlong_group_id, inlong_stream_id, msg, msg_len, 
&PyInLongApi::CallbackFunc);
-    }
-
-private:
-    static py::function py_callback;
+std::map<inlong::UserCallBack, py::function> g_py_callbacks;
+// The number of unfinished callbacks
+std::atomic<int> unfinished_callbacks(0);
 
-    static int CallbackFunc(const char *a, const char *b, const char *c, 
int32_t d, const int64_t e, const char *f) {
-        if (py_callback) {
-            try {
-                return py_callback(a, b, c, d, e, f).cast<int>();
-            } catch (const py::error_already_set &e) {
-                // Handle Python exception
-                return -1;
-            }
-        }
-        return 0;
+int UserCallBackBridge(const char *a, const char *b, const char *c, int32_t d, 
const int64_t e, const char *f) {
+    auto it = g_py_callbacks.find(UserCallBackBridge);
+    if (it != g_py_callbacks.end()) {
+        // Before callback is called, acquire GIL
+        py::gil_scoped_acquire acquire;
+        int result = it->second(a, b, c, d, e, f).cast<int>();
+        unfinished_callbacks--;
+        return result;
     }
-};
-
-py::function PyInLongApi::py_callback;
+    return -1;
+}
 
 PYBIND11_MODULE(inlong_dataproxy, m) {
-    m.doc() = "This module provides InLong dataproxy api to send message to 
InLong dataproxy.";
-
-    py::class_<PyInLongApi>(m, "InLongApi")
+    m.doc() = "Python bindings for InLong SDK API";
+    py::class_<inlong::InLongApi>(m, "InLongApi")
         .def(py::init<>())
-        .def("init_api", &PyInLongApi::InitApi, py::arg("config_path"))
-        .def("add_bid", &PyInLongApi::AddBid, py::arg("group_ids"))
-        .def("send", &PyInLongApi::PySend, py::arg("inlong_group_id"), 
py::arg("inlong_stream_id"), py::arg("msg"), py::arg("msg_len"), 
py::arg("callback_func") = nullptr)
-        .def("close_api", &PyInLongApi::CloseApi, py::arg("max_waitms"));
-}
+        .def("init_api", &inlong::InLongApi::InitApi)
+        .def("add_bid", &inlong::InLongApi::AddBid)
+        .def("send", [](inlong::InLongApi& self, const char* inlong_group_id, 
const char* inlong_stream_id, const char* msg, int32_t msg_len, py::object 
pyCallback = py::none()) {
+            if (!pyCallback.is(py::none())) {
+                g_py_callbacks[UserCallBackBridge] = 
pyCallback.cast<py::function>();
+                unfinished_callbacks++;
+                int result = self.Send(inlong_group_id, inlong_stream_id, msg, 
msg_len, UserCallBackBridge);
+                // When callback is called, release GIL
+                py::gil_scoped_release release;
+                return result;
+            } else {
+                int result = self.Send(inlong_group_id, inlong_stream_id, msg, 
msg_len, nullptr);
+                return result;
+            }
+        })
+        .def("close_api", [](inlong::InLongApi& self, int32_t max_waitms) {
+            auto start = std::chrono::high_resolution_clock::now();
+            while (unfinished_callbacks > 0) {
+                auto now = std::chrono::high_resolution_clock::now();
+                auto elapsed_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
+                if (elapsed_ms.count() >= max_waitms) {
+                    std::cout << "Maximum wait time reached" << std::endl;
+                    break;
+                }
+                std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            }
+            py::gil_scoped_release release;
+            return self.CloseApi(10000);

Review Comment:
   Why is 10000 fixed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to