djanderson commented on code in PR #14286: URL: https://github.com/apache/datafusion/pull/14286#discussion_r1968617896
########## datafusion-examples/examples/thread_pools_lib/dedicated_executor.rs: ########## @@ -0,0 +1,1778 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. + +use crate::SendableRecordBatchStream; +use async_trait::async_trait; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion_common::error::GenericError; +use datafusion_common::DataFusionError; +use futures::stream::BoxStream; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, Stream, StreamExt, TryFutureExt, +}; +use log::{info, warn}; +use object_store::path::Path; +use object_store::{ + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, UploadPart, +}; +use std::cell::RefCell; +use std::pin::Pin; +use std::sync::RwLock; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; +use tokio_stream::wrappers::ReceiverStream; + +/// Create a [`DedicatedExecutorBuilder`] from a tokio [`Builder`] +impl From<Builder> for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing CPU bound +/// tasks such as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` can helps avoid issues when runnnig IO and CPU bound tasks on the +/// same thread pool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// `DedicatedExecutor`s can be `clone`ed and all clones share the same thread pool. +/// +/// Since the primary use for a `DedicatedExecutor` is offloading CPU bound +/// work, IO work can not be performed on tasks launched in the Executor. +/// +/// To perform IO, see: +/// - [`Self::spawn_io`] +/// - [`Self::wrap_object_store`] +/// +/// When [`DedicatedExecutorBuilder::build`] is called, a reference to the +/// "current" tokio runtime will be stored and used, via [`register_io_runtime`] by all +/// threads spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the +/// I/O runtime. +/// +/// # TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// <https://stackoverflow.com/questions/62536566> +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +/// # Notes +/// This code is derived from code originally written for [InfluxDB 3.0] +/// +/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +#[derive(Clone, Debug)] +pub struct DedicatedExecutor { + /// State for managing Tokio Runtime Handle for CPU tasks + state: Arc<RwLock<State>>, +} + +impl DedicatedExecutor { + /// Create a new builder to crate a [`DedicatedExecutor`] + pub fn builder() -> DedicatedExecutorBuilder { + DedicatedExecutorBuilder::new() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// See The struct documentation for more details + /// + /// The specified task is added to the tokio executor immediately and + /// compete for the thread pool's resources. + /// + /// # Behavior on `Drop` + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>> Review Comment: ```suggestion pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>> + use<T> ``` Use new [RPIT lifetime precise capturing](https://doc.rust-lang.org/nightly/edition-guide/rust-2024/rpit-lifetime-capture.html) for compatibility with newly released rust 2024 edition. Otherwise this example fails to compile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org