github-actions[bot] commented on code in PR #64039:
URL: https://github.com/apache/doris/pull/64039#discussion_r3362415258


##########
be/src/udf/python/python_server.cpp:
##########
@@ -99,130 +132,160 @@ Status PythonServerManager::get_client(const 
PythonUDFMeta& func_meta, const Pyt
 
 Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
 PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
-    auto versioned_pool = _get_or_create_process_pool(version);
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-
-    // Check if already initialized
-    if (versioned_pool->initialized) return versioned_pool;
-
-    // 0 means use CPU core count as default, otherwise use the specified value
-    int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
-                                                           : 
CpuInfo::num_cores();
-
-    LOG(INFO) << "Initializing Python process pool for version " << 
version.to_string() << " with "
-              << max_pool_size
-              << " processes (config::max_python_process_num=" << 
config::max_python_process_num
-              << ", CPU cores=" << CpuInfo::num_cores() << ")";
-
-    std::vector<std::future<Status>> futures;
-    std::vector<ProcessPtr> temp_processes(max_pool_size);
+    auto versioned_pool_result = _get_or_create_process_pool(version);
+    if (!versioned_pool_result.has_value()) {
+        return ResultError(versioned_pool_result.error());
+    }
+    auto versioned_pool = versioned_pool_result.value();
+    const int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
+                                                                 : 
CpuInfo::num_cores();
 
-    for (int i = 0; i < max_pool_size; i++) {
-        futures.push_back(std::async(std::launch::async, [this, &version, i, 
&temp_processes]() {
-            ProcessPtr process;
-            Status s = fork(version, &process);
-            if (s.ok()) {
-                temp_processes[i] = std::move(process);
+    bool start_health_check = false;
+    {
+        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+        if (versioned_pool->initialized) {
+            if (versioned_pool->has_available_process) {
+                lock.unlock();
+                _start_health_check_thread();
+            }
+            return versioned_pool;
+        } else {
+            if (versioned_pool->processes.empty()) {
+                versioned_pool->has_available_process = false;
+                versioned_pool->stopped = false;
+                versioned_pool->processes.resize(max_pool_size);
+
+                LOG(INFO) << "Initializing Python process pool for version " 
<< version.to_string()
+                          << " with " << max_pool_size
+                          << " processes (config::max_python_process_num="
+                          << config::max_python_process_num
+                          << ", CPU cores=" << CpuInfo::num_cores() << ")";
+
+                for (int i = 0; i < max_pool_size; ++i) {
+                    std::thread([version, versioned_pool, i, max_pool_size]() {
+                        SCOPED_INIT_THREAD_CONTEXT();
+                        ProcessPtr process;
+                        Status status = PythonServerManager::fork(version, 
&process);
+                        const bool ok = status.ok() && process;
+                        ProcessPtr process_to_shutdown;
+                        {
+                            std::lock_guard<std::mutex> 
lock(versioned_pool->mutex);
+                            // shutdown() and repair can race with detached 
init workers after timeout.
+                            // Late successful forks only fill slots that are 
still empty or dead.
+                            if (ok && !versioned_pool->stopped &&
+                                i < versioned_pool->processes.size() &&
+                                (!versioned_pool->processes[i] ||
+                                 !versioned_pool->processes[i]->is_alive())) {
+                                versioned_pool->processes[i] = 
std::move(process);
+                                versioned_pool->has_available_process = true;
+                            } else if (ok) {
+                                process_to_shutdown = std::move(process);
+                            } else [[unlikely]] {
+                                LOG(WARNING) << "Failed to create Python 
process " << (i + 1) << "/"
+                                             << max_pool_size << " for version 
"
+                                             << version.to_string() << ": " << 
status.to_string();
+                            }
+                        }
+                        versioned_pool->cv.notify_all();
+                        if (process_to_shutdown) {
+                            process_to_shutdown->shutdown();
+                        }
+                    }).detach();
+                }
             }
-            return s;
-        }));
-    }
 
-    int success_count = 0;
-    int failure_count = 0;
-    const auto init_start_time = std::chrono::steady_clock::now();
-#ifdef BE_TEST
-    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
-#else
-    constexpr auto progress_log_interval = std::chrono::seconds(20);
-#endif
-    for (int i = 0; i < max_pool_size; i++) {
-        // Print init log every 20s until the current slot is ready.
-        while (futures[i].wait_for(progress_log_interval) != 
std::future_status::ready) {
-            const auto now = std::chrono::steady_clock::now();
-            const auto total_elapsed_ms =
-                    std::chrono::duration_cast<std::chrono::milliseconds>(now 
- init_start_time)
-                            .count();
-            LOG(INFO) << "Python process pool initialization progress for 
version "
-                      << version.to_string() << ": waiting_slot=" << (i + 1) 
<< "/" << max_pool_size
-                      << ", success=" << success_count << ", failed=" << 
failure_count
-                      << ", elapsed_ms=" << total_elapsed_ms;
+            // Wait only for the first usable process. The rest of the pool is 
best-effort and will be
+            // filled by health check, so partial init failure is logged but 
not exposed to users.
+            versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, 
[&versioned_pool]() {
+                return versioned_pool->has_available_process || 
versioned_pool->stopped;

Review Comment:
   This wait only exits on `has_available_process` or `stopped`, so if every 
detached fork worker fails quickly the caller still waits the full 
`PROCESS_POOL_INIT_TIMEOUT` before returning. A concrete path is an invalid 
`PythonVersion` executable: each worker logs failure and notifies, but no state 
records that all `max_pool_size` attempts are done, the predicate remains 
false, and production `get_client()` stalls for 20s before returning 
`SERVICE_UNAVAILABLE`. Please track completed/failed init workers under 
`versioned_pool->mutex` and include an `all_workers_finished && 
!has_available_process` condition so all-failed initialization returns as soon 
as the last attempt finishes.



##########
be/src/udf/python/python_udf_runtime.cpp:
##########
@@ -18,15 +18,183 @@
 #include "udf/python/python_udf_runtime.h"
 
 #include <butil/fd_utility.h>
+#include <signal.h>
+#include <string.h>
 #include <sys/wait.h>
 #include <unistd.h>
 
+#include <algorithm>
 #include <boost/process.hpp>
+#include <cerrno>
+#include <chrono>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
+#ifdef BE_TEST
+#include <atomic>
+#endif
 
 #include "common/logging.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
+#ifdef BE_TEST
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
+#else
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
+#endif
+static constexpr size_t MAX_BACKGROUND_REAP_PIDS = 1024;
+static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
+
+#ifdef BE_TEST
+static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
+
+static bool consume_forced_child_exit_timeout() {
+    int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
+    while (remaining > 0) {
+        if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, 
remaining - 1,
+                                                             
std::memory_order_relaxed)) {
+            return true;
+        }
+    }
+    return false;
+}
+#endif
+
+struct BackgroundChildReaper {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::deque<pid_t> pids;
+#ifdef BE_TEST
+    std::deque<pid_t> reaped_pids;
+#endif
+    std::thread thread;
+};
+
+static BackgroundChildReaper& background_child_reaper() {
+    static auto* reaper = new BackgroundChildReaper();
+    return *reaper;
+}
+
+void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
+    if (pid <= 0) [[unlikely]] {
+        return;
+    }
+
+    auto& reaper = background_child_reaper();
+    {
+        std::lock_guard<std::mutex> lock(reaper.mutex);
+        if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != 
reaper.pids.end()) {
+            return;
+        }
+        if (reaper.pids.size() >= MAX_BACKGROUND_REAP_PIDS) {
+            LOG(WARNING) << "Skip background reap for Python process pid=" << 
pid

Review Comment:
   This capacity check only counts `reaper.pids`, but the worker immediately 
moves queued pids into its local `pending_pids` list while stuck children are 
still unreaped. After that move the shared queue is empty, so another 1024 
stuck pids can be accepted, then another batch, and the total pending reap set 
grows without the intended `MAX_BACKGROUND_REAP_PIDS` bound. Please keep the 
in-flight pending count in the shared `BackgroundChildReaper` state, or 
otherwise enforce the limit against both queued and worker-local pending pids.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to