HappenLee commented on code in PR #64039:
URL: https://github.com/apache/doris/pull/64039#discussion_r3361311510


##########
be/src/udf/python/python_server.cpp:
##########
@@ -99,130 +114,182 @@ Status PythonServerManager::get_client(const 
PythonUDFMeta& func_meta, const Pyt
 
 Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
 PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
-    auto versioned_pool = _get_or_create_process_pool(version);
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-
-    // Check if already initialized
-    if (versioned_pool->initialized) return versioned_pool;
-
-    // 0 means use CPU core count as default, otherwise use the specified value
-    int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
-                                                           : 
CpuInfo::num_cores();
-
-    LOG(INFO) << "Initializing Python process pool for version " << 
version.to_string() << " with "
-              << max_pool_size
-              << " processes (config::max_python_process_num=" << 
config::max_python_process_num
-              << ", CPU cores=" << CpuInfo::num_cores() << ")";
-
-    std::vector<std::future<Status>> futures;
-    std::vector<ProcessPtr> temp_processes(max_pool_size);
+    auto versioned_pool_result = _get_or_create_process_pool(version);
+    if (!versioned_pool_result.has_value()) {
+        return ResultError(versioned_pool_result.error());
+    }
+    auto versioned_pool = versioned_pool_result.value();
+    const int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
+                                                                 : 
CpuInfo::num_cores();
 
-    for (int i = 0; i < max_pool_size; i++) {
-        futures.push_back(std::async(std::launch::async, [this, &version, i, 
&temp_processes]() {
-            ProcessPtr process;
-            Status s = fork(version, &process);
-            if (s.ok()) {
-                temp_processes[i] = std::move(process);
+    bool start_health_check = false;
+    {
+        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+        if (versioned_pool->initialized) {
+            if (versioned_pool->has_available_process) {
+                lock.unlock();
+                _start_health_check_thread();
+            }
+            return versioned_pool;
+        } else {
+            if (versioned_pool->processes.empty()) {
+                versioned_pool->has_available_process = false;
+                versioned_pool->stopped = false;
+                versioned_pool->processes.resize(max_pool_size);
+
+                LOG(INFO) << "Initializing Python process pool for version " 
<< version.to_string()
+                          << " with " << max_pool_size
+                          << " processes (config::max_python_process_num="
+                          << config::max_python_process_num
+                          << ", CPU cores=" << CpuInfo::num_cores() << ")";
+
+                for (int i = 0; i < max_pool_size; ++i) {
+                    std::thread([version, versioned_pool, i, max_pool_size]() {
+                        SCOPED_INIT_THREAD_CONTEXT();
+                        ProcessPtr process;
+                        Status status = PythonServerManager::fork(version, 
&process);
+                        const bool ok = status.ok() && process;
+                        ProcessPtr process_to_shutdown;
+                        {
+                            std::lock_guard<std::mutex> 
lock(versioned_pool->mutex);
+                            // shutdown() and repair can race with detached 
init workers after timeout.
+                            // Late successful forks only fill slots that are 
still empty or dead.
+                            if (ok && !versioned_pool->stopped &&
+                                i < versioned_pool->processes.size() &&
+                                (!versioned_pool->processes[i] ||
+                                 !versioned_pool->processes[i]->is_alive())) {
+                                versioned_pool->processes[i] = 
std::move(process);
+                                versioned_pool->has_available_process = true;
+                            } else if (ok) {
+                                process_to_shutdown = std::move(process);
+                            } else [[unlikely]] {
+                                LOG(WARNING) << "Failed to create Python 
process " << (i + 1) << "/"
+                                             << max_pool_size << " for version 
"
+                                             << version.to_string() << ": " << 
status.to_string();
+                            }
+                        }
+                        versioned_pool->cv.notify_all();
+                        if (process_to_shutdown) {
+                            process_to_shutdown->shutdown();
+                        }
+                    }).detach();
+                }
             }
-            return s;
-        }));
-    }
 
-    int success_count = 0;
-    int failure_count = 0;
-    const auto init_start_time = std::chrono::steady_clock::now();
-#ifdef BE_TEST
-    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
-#else
-    constexpr auto progress_log_interval = std::chrono::seconds(20);
-#endif
-    for (int i = 0; i < max_pool_size; i++) {
-        // Print init log every 20s until the current slot is ready.
-        while (futures[i].wait_for(progress_log_interval) != 
std::future_status::ready) {
-            const auto now = std::chrono::steady_clock::now();
-            const auto total_elapsed_ms =
-                    std::chrono::duration_cast<std::chrono::milliseconds>(now 
- init_start_time)
-                            .count();
-            LOG(INFO) << "Python process pool initialization progress for 
version "
-                      << version.to_string() << ": waiting_slot=" << (i + 1) 
<< "/" << max_pool_size
-                      << ", success=" << success_count << ", failed=" << 
failure_count
-                      << ", elapsed_ms=" << total_elapsed_ms;
+            // Wait only for the first usable process. The rest of the pool is 
best-effort and will be
+            // filled by health check, so partial init failure is logged but 
not exposed to users.
+            versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, 
[&versioned_pool]() {
+                return versioned_pool->has_available_process || 
versioned_pool->stopped;
+            });
+            // Mark the first init round done even when no process is 
available. Health check and the
+            // next foreground _get_process() repair the initialized-but-empty 
pool without launching a
+            // second full detached init round.
+            versioned_pool->initialized = true;
+            versioned_pool->cv.notify_all();
         }
 
-        Status s = futures[i].get();
-        if (s.ok() && temp_processes[i]) {
-            
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
-            success_count++;
-        } else {
-            failure_count++;
-            LOG(WARNING) << "Failed to create Python process " << (i + 1) << 
"/" << max_pool_size
-                         << ": " << s.to_string();
+        if (versioned_pool->has_available_process) {
+            lock.unlock();
+            _start_health_check_thread();
+            return versioned_pool;
         }
+        start_health_check = !versioned_pool->stopped;
     }
 
-    if (versioned_pool->processes.empty()) {
-        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
-                "Failed to initialize Python process pool: all {} process 
creation attempts failed",
-                max_pool_size));
+    if (start_health_check) {
+        _start_health_check_thread();
     }
-
-    const auto total_elapsed_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(
-                                          std::chrono::steady_clock::now() - 
init_start_time)
-                                          .count();
-    LOG(INFO) << "Python process pool initialized for version " << 
version.to_string()
-              << ": created " << success_count << " processes"
-              << (failure_count > 0 ? fmt::format(" ({} failed)", 
failure_count) : "")
-              << ", elapsed_ms=" << total_elapsed_ms;
-
-    versioned_pool->initialized = true;
-    _start_health_check_thread();
-
-    return versioned_pool;
+    return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+            "Failed to initialize Python process pool for version {}: no 
process became available "
+            "within {} ms",
+            version.to_string(), PROCESS_POOL_INIT_TIMEOUT.count()));
 }
 
 Status PythonServerManager::_get_process(
         const PythonVersion& version, const 
std::shared_ptr<VersionedProcessPool>& versioned_pool,
         ProcessPtr* process) {
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-    std::vector<ProcessPtr>& pool = versioned_pool->processes;
+    {
+        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+        std::vector<ProcessPtr>& pool = versioned_pool->processes;
 
-    if (UNLIKELY(pool.empty())) {
-        return Status::InternalError("Python process pool is empty for version 
{}",
-                                     version.to_string());
-    }
+        if (versioned_pool->stopped) {
+            versioned_pool->has_available_process = false;
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool has stopped for version {}", 
version.to_string());
+        }
 
-    // Prefer an already-alive process and only use load balancing inside that 
alive subset.
-    // keep dead entries stay in the pool for the background health checker
-    // unless there is no alive process left for the current request.
-    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
-                                           [](const ProcessPtr& a, const 
ProcessPtr& b) {
-                                               const bool a_alive = a && 
a->is_alive();
-                                               const bool b_alive = b && 
b->is_alive();
-                                               if (a_alive != b_alive) {
-                                                   return a_alive > b_alive;
-                                               }
-                                               return a.use_count() < 
b.use_count();
-                                           });
-
-    if (min_alive_iter != pool.end() && *min_alive_iter && 
(*min_alive_iter)->is_alive()) {
-        *process = *min_alive_iter;
-        return Status::OK();
-    }
+        auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
+                                               [](const ProcessPtr& a, const 
ProcessPtr& b) {
+                                                   const bool a_alive = a && 
a->is_alive();
+                                                   const bool b_alive = b && 
b->is_alive();
+                                                   if (a_alive != b_alive) {
+                                                       return a_alive > 
b_alive;
+                                                   }
+                                                   return a.use_count() < 
b.use_count();
+                                               });
+
+        if (min_alive_iter != pool.end() && *min_alive_iter && 
(*min_alive_iter)->is_alive())
+                [[likely]] {
+            versioned_pool->has_available_process = true;
+            *process = *min_alive_iter;
+            return Status::OK();
+        }
+        versioned_pool->has_available_process = false;
+
+        if (!versioned_pool->repairing) {
+            versioned_pool->repairing = true;
+            // Repair is done in the background because fork can be slow. The 
current request still
+            // waits briefly below so a transient all-dead pool can recover 
without failing.
+            std::thread([version, versioned_pool]() {
+                SCOPED_INIT_THREAD_CONTEXT();
+                int recreated = 
PythonServerManager::_repair_process_pool(version, versioned_pool);
+                {
+                    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+                    versioned_pool->repairing = false;
+                }
+                versioned_pool->cv.notify_all();
+                if (recreated > 0) {
+                    LOG(INFO) << "Repaired Python process pool for version " 
<< version.to_string()
+                              << ": recreated=" << recreated;
+                }
+            }).detach();
+        }
 
-    // Only reach here when the pool has no alive process at all. Try one 
foreground
-    // recovery so the caller has a chance to proceed; leave batch repair to 
health check.
-    auto& candidate = pool.front();
-    ProcessPtr replacement;
-    Status status = fork(version, &replacement);
-    if (!status.ok()) {
-        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
-                "Python process pool has no available process for version {}, 
reason: {}",
-                version.to_string(), status.to_string());
+        // Keep the request recoverable in the common case where the Python 
runtime can fork
+        // normally and only the existing pool entries died. The wait is short 
so a wedged fork path
+        // still returns SERVICE_UNAVAILABLE promptly.
+        versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, 
[&versioned_pool]() {
+            return std::any_of(versioned_pool->processes.begin(), 
versioned_pool->processes.end(),
+                               [](const ProcessPtr& p) { return p && 
p->is_alive(); }) ||
+                   versioned_pool->stopped;
+        });
+        if (versioned_pool->stopped) {
+            versioned_pool->has_available_process = false;
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool has stopped for version {}", 
version.to_string());
+        }
+
+        auto repaired_iter = std::min_element(pool.begin(), pool.end(),

Review Comment:
   这个函数和min_alive_iter没什么区别,不能统一一下吗?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to