djanderson commented on code in PR #14286:
URL: https://github.com/apache/datafusion/pull/14286#discussion_r1968617896


##########
datafusion-examples/examples/thread_pools_lib/dedicated_executor.rs:
##########
@@ -0,0 +1,1778 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio 
runtime.
+
+use crate::SendableRecordBatchStream;
+use async_trait::async_trait;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_common::error::GenericError;
+use datafusion_common::DataFusionError;
+use futures::stream::BoxStream;
+use futures::{
+    future::{BoxFuture, Shared},
+    Future, FutureExt, Stream, StreamExt, TryFutureExt,
+};
+use log::{info, warn};
+use object_store::path::Path;
+use object_store::{
+    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta,
+    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, 
UploadPart,
+};
+use std::cell::RefCell;
+use std::pin::Pin;
+use std::sync::RwLock;
+use std::task::{Context, Poll};
+use std::{fmt::Display, sync::Arc, time::Duration};
+use tokio::runtime::Builder;
+use tokio::task::JoinHandle;
+use tokio::{
+    runtime::Handle,
+    sync::{oneshot::error::RecvError, Notify},
+    task::JoinSet,
+};
+use tokio_stream::wrappers::ReceiverStream;
+
+/// Create a [`DedicatedExecutorBuilder`] from a tokio [`Builder`]
+impl From<Builder> for DedicatedExecutorBuilder {
+    fn from(value: Builder) -> Self {
+        Self::new_from_builder(value)
+    }
+}
+
+/// Manages a separate tokio [`Runtime`] (thread pool) for executing CPU bound
+/// tasks such as DataFusion `ExecutionPlans`.
+///
+/// See [`DedicatedExecutorBuilder`] for creating a new instance.
+///
+/// A `DedicatedExecutor` can helps avoid issues when runnnig IO and CPU bound 
tasks on the
+/// same thread pool by running futures (and any `tasks` that are
+/// `tokio::task::spawned` by them) on a separate tokio [`Executor`].
+///
+/// `DedicatedExecutor`s can be `clone`ed and all clones share the same thread 
pool.
+///
+/// Since the primary use for a `DedicatedExecutor` is offloading CPU bound
+/// work, IO work can not be performed on tasks launched in the Executor.
+///
+/// To perform IO, see:
+/// - [`Self::spawn_io`]
+/// - [`Self::wrap_object_store`]
+///
+/// When [`DedicatedExecutorBuilder::build`] is called, a reference to the
+/// "current" tokio runtime will be stored and used, via 
[`register_io_runtime`] by all
+/// threads spawned by the executor. Any I/O done by threads in this
+/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the
+/// I/O runtime.
+///
+/// # TODO examples
+///
+/// # Background
+///
+/// Tokio has the notion of the "current" runtime, which runs the current 
future
+/// and any tasks spawned by it. Typically, this is the runtime created by
+/// `tokio::main` and is used for the main application logic and I/O handling
+///
+/// For CPU bound work, such as DataFusion plan execution, it is important to
+/// run on a separate thread pool to avoid blocking the I/O handling for 
extended
+/// periods of time in order to avoid long poll latencies (which decreases the
+/// throughput of small requests under concurrent load).
+///
+/// # IO Scheduling
+///
+/// I/O, such as network calls, should not be performed on the runtime managed
+/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
+/// CPU tasks will not be preempted and can therefore starve servicing of other
+/// tasks. This manifests in long poll-latencies, where a task is ready to run
+/// but isn't being scheduled to run. For CPU-bound work this isn't a problem 
as
+/// there is no external party waiting on a response, however, for I/O tasks,
+/// long poll latencies can prevent timely servicing of IO, which can have a
+/// significant detrimental effect.
+///
+/// # Details
+///
+/// The worker thread priority is set to low so that such tasks do
+/// not starve other more important tasks (such as answering health checks)
+///
+/// Follows the example from stack overflow and spawns a new
+/// thread to install a Tokio runtime "context"
+/// <https://stackoverflow.com/questions/62536566>
+///
+/// # Trouble Shooting:
+///
+/// ## "No IO runtime registered. Call 
`register_io_runtime`/`register_current_runtime_for_io` in current thread!
+///
+/// This means that IO was attempted on a tokio runtime that was not registered
+/// for IO. One solution is to run the task using [DedicatedExecutor::spawn].
+///
+/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
+///
+/// If you try to use this structure from an async context you see something 
like
+/// thread 'test_builder_plan' panicked at 'Cannot
+/// drop a runtime in a context where blocking is not allowed it means  This
+/// happens when a runtime is dropped from within an asynchronous
+/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
+///
+/// # Notes
+/// This code is derived from code originally written for [InfluxDB 3.0]
+///
+/// [InfluxDB 3.0]: 
https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
+#[derive(Clone, Debug)]
+pub struct DedicatedExecutor {
+    /// State for managing Tokio Runtime Handle for CPU tasks
+    state: Arc<RwLock<State>>,
+}
+
+impl DedicatedExecutor {
+    /// Create a new builder to crate a [`DedicatedExecutor`]
+    pub fn builder() -> DedicatedExecutorBuilder {
+        DedicatedExecutorBuilder::new()
+    }
+
+    /// Runs the specified [`Future`] (and any tasks it spawns) on the thread
+    /// pool managed by this `DedicatedExecutor`.
+    ///
+    /// See The struct documentation for more details
+    ///
+    /// The specified task is added to the tokio executor immediately and
+    /// compete for the thread pool's resources.
+    ///
+    /// # Behavior on `Drop`
+    ///
+    /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** 
when
+    /// it is dropped. Thus, you need ensure the returned future lives until it
+    /// completes (call `await`) or you wish to cancel it.
+    pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, 
JobError>>

Review Comment:
   ```suggestion
       pub fn spawn<T>(&self, task: T) -> impl Future<Output = 
Result<T::Output, JobError>> + use<T>
   ```
   
   Use new [RPIT lifetime precise 
capturing](https://doc.rust-lang.org/nightly/edition-guide/rust-2024/rpit-lifetime-capture.html)
 for compatibility with newly released rust 2024 edition. Otherwise this 
example fails to compile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to