github-actions[bot] commented on code in PR #64039:
URL: https://github.com/apache/doris/pull/64039#discussion_r3344850108


##########
be/src/udf/python/python_server.cpp:
##########
@@ -359,16 +525,24 @@ void PythonServerManager::shutdown() {
     }
 
     // Shutdown all processes
+    std::vector<ProcessPtr> processes_to_shutdown;
     for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
         std::lock_guard<std::mutex> lock(versioned_pool->mutex);
         auto& pool = versioned_pool->processes;
         for (auto& process : pool) {
             if (process) {
-                process->shutdown();
+                processes_to_shutdown.emplace_back(std::move(process));
             }

Review Comment:
   `stopped` is only set for pools returned by this snapshot. A concurrent 
`_ensure_pool_initialized()` can call `_get_or_create_process_pool()` after 
`_snapshot_process_pools()` has already run, create a new 
`VersionedProcessPool`, and launch the detached init workers added above. That 
pool is never marked `stopped` by this shutdown call, so a late worker can 
publish a live Python process after shutdown and it will not be present in 
`processes_to_shutdown` or `_process_pools` anymore. Please serialize shutdown 
with pool creation/init, for example by guarding a manager-level stopping state 
with `_pools_mutex` and rejecting/marking pools created once shutdown starts.



##########
be/src/udf/python/python_server.cpp:
##########
@@ -100,129 +109,172 @@ Status PythonServerManager::get_client(const 
PythonUDFMeta& func_meta, const Pyt
 Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
 PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
     auto versioned_pool = _get_or_create_process_pool(version);
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-
-    // Check if already initialized
-    if (versioned_pool->initialized) return versioned_pool;
-
-    // 0 means use CPU core count as default, otherwise use the specified value
-    int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
-                                                           : 
CpuInfo::num_cores();
-
-    LOG(INFO) << "Initializing Python process pool for version " << 
version.to_string() << " with "
-              << max_pool_size
-              << " processes (config::max_python_process_num=" << 
config::max_python_process_num
-              << ", CPU cores=" << CpuInfo::num_cores() << ")";
+    const int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
+                                                                 : 
CpuInfo::num_cores();
 
-    std::vector<std::future<Status>> futures;
-    std::vector<ProcessPtr> temp_processes(max_pool_size);
-
-    for (int i = 0; i < max_pool_size; i++) {
-        futures.push_back(std::async(std::launch::async, [this, &version, i, 
&temp_processes]() {
-            ProcessPtr process;
-            Status s = fork(version, &process);
-            if (s.ok()) {
-                temp_processes[i] = std::move(process);
+    bool start_health_check = false;
+    {
+        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+        if (versioned_pool->initialized) {
+            if (versioned_pool->has_available_process) {
+                lock.unlock();
+                _start_health_check_thread();
+            }
+            return versioned_pool;
+        } else {
+            if (versioned_pool->processes.empty()) {
+                versioned_pool->has_available_process = false;
+                versioned_pool->stopped = false;
+                versioned_pool->processes.resize(max_pool_size);
+
+                LOG(INFO) << "Initializing Python process pool for version " 
<< version.to_string()
+                          << " with " << max_pool_size
+                          << " processes (config::max_python_process_num="
+                          << config::max_python_process_num
+                          << ", CPU cores=" << CpuInfo::num_cores() << ")";
+
+                for (int i = 0; i < max_pool_size; ++i) {
+                    std::thread([version, versioned_pool, i, max_pool_size]() {
+                        SCOPED_INIT_THREAD_CONTEXT();
+                        ProcessPtr process;
+                        Status status = PythonServerManager::fork(version, 
&process);
+                        const bool ok = status.ok() && process;
+                        ProcessPtr process_to_shutdown;
+                        {
+                            std::lock_guard<std::mutex> 
lock(versioned_pool->mutex);
+                            // shutdown() and repair can race with detached 
init workers after timeout.
+                            // Late successful forks only fill slots that are 
still empty or dead.
+                            if (ok && !versioned_pool->stopped &&
+                                i < versioned_pool->processes.size() &&
+                                (!versioned_pool->processes[i] ||
+                                 !versioned_pool->processes[i]->is_alive())) {
+                                versioned_pool->processes[i] = 
std::move(process);
+                                versioned_pool->has_available_process = true;
+                            } else if (ok) {
+                                process_to_shutdown = std::move(process);
+                            } else [[unlikely]] {
+                                LOG(WARNING) << "Failed to create Python 
process " << (i + 1) << "/"
+                                             << max_pool_size << " for version 
"
+                                             << version.to_string() << ": " << 
status.to_string();
+                            }
+                        }
+                        versioned_pool->cv.notify_all();
+                        if (process_to_shutdown) {
+                            process_to_shutdown->shutdown();
+                        }
+                    }).detach();
+                }
             }
-            return s;
-        }));
-    }
 
-    int success_count = 0;
-    int failure_count = 0;
-    const auto init_start_time = std::chrono::steady_clock::now();
-#ifdef BE_TEST
-    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
-#else
-    constexpr auto progress_log_interval = std::chrono::seconds(20);
-#endif
-    for (int i = 0; i < max_pool_size; i++) {
-        // Print init log every 20s until the current slot is ready.
-        while (futures[i].wait_for(progress_log_interval) != 
std::future_status::ready) {
-            const auto now = std::chrono::steady_clock::now();
-            const auto total_elapsed_ms =
-                    std::chrono::duration_cast<std::chrono::milliseconds>(now 
- init_start_time)
-                            .count();
-            LOG(INFO) << "Python process pool initialization progress for 
version "
-                      << version.to_string() << ": waiting_slot=" << (i + 1) 
<< "/" << max_pool_size
-                      << ", success=" << success_count << ", failed=" << 
failure_count
-                      << ", elapsed_ms=" << total_elapsed_ms;
+            // Wait only for the first usable process. The rest of the pool is 
best-effort and will be
+            // filled by health check, so partial init failure is logged but 
not exposed to users.
+            versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, 
[&versioned_pool]() {
+                return versioned_pool->has_available_process || 
versioned_pool->stopped;
+            });
+            // Mark the first init round done even when no process is 
available. Health check and the
+            // next foreground _get_process() repair the initialized-but-empty 
pool without launching a
+            // second full detached init round.
+            versioned_pool->initialized = true;
+            versioned_pool->cv.notify_all();
         }
 
-        Status s = futures[i].get();
-        if (s.ok() && temp_processes[i]) {
-            
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
-            success_count++;
-        } else {
-            failure_count++;
-            LOG(WARNING) << "Failed to create Python process " << (i + 1) << 
"/" << max_pool_size
-                         << ": " << s.to_string();
+        if (versioned_pool->has_available_process) {
+            lock.unlock();
+            _start_health_check_thread();
+            return versioned_pool;
         }
+        start_health_check = !versioned_pool->stopped;
     }
 
-    if (versioned_pool->processes.empty()) {
-        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
-                "Failed to initialize Python process pool: all {} process 
creation attempts failed",
-                max_pool_size));

Review Comment:
   This new failure path can start the health-check thread while another thread 
is in `shutdown()`. `_start_health_check_thread()` protects 
`_health_check_thread` with `_health_check_mutex`, but `shutdown()` reads, 
joins, and resets the same `unique_ptr` without that mutex, so concurrent 
init/shutdown can race on the thread object. Please use the same mutex in 
`shutdown()` or otherwise serialize health-check start with shutdown before 
calling this path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to