alamb commented on code in PR #13690:
URL: https://github.com/apache/datafusion/pull/13690#discussion_r1874949626


##########
datafusion-examples/examples/thread_pools.rs:
##########
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use a separate thread pool (tokio [`Runtime`])) 
to
+//! run the CPU intensive parts of DataFusion plans.
+//!
+//! Running DataFusion plans that perform I/O, such as reading parquet files
+//! directly from remote object storage (e.g. AWS S3) without care will result
+//! in running CPU intensive jobs on the same thread pool, which can lead to 
the
+//! issues described in  the [Architecture section] such as throttled bandwidth
+//! due to congestion control and increased latencies for processing network
+//! messages.
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::error::Result;
+use datafusion::execution::dedicated_executor::DedicatedExecutor;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::http::HttpBuilder;
+use object_store::ObjectStore;
+use std::sync::Arc;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio runtime,
+/// but for this example it is important to understand how the [`Runtime`]s 
work.
+///
+/// There is a "current" runtime that is installed in a thread local variable
+/// that is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it 
as
+/// as the "current" runtime (on which any `async` futures, streams and tasks
+/// are run).
+#[tokio::main]
+async fn main() -> Result<()> {
+    // The first two examples only do local file IO. Enable the  URL table so 
we
+    // can select directly from filenames in SQL.
+    let sql = format!(
+        "SELECT * FROM '{}/alltypes_plain.parquet'",
+        datafusion::test_util::parquet_test_data()
+    );
+
+    // Run the same query on the same runtime. Note that calling `await` here
+    // will effectively run the future (in this case the `async` function) on
+    // the current runtime
+    same_runtime(&sql).await?;
+
+    // Run the same query on a different runtime.
+    // Run the same query on a different runtime including remote IO
+    different_runtime_advanced().await?;
+
+    Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is now most examples in DataFusion are written and works well for
+/// development and local query processing.
+async fn same_runtime(sql: &str) -> Result<()> {
+    let ctx = SessionContext::new().enable_url_table();
+
+    // Calling .sql is an async function as it may also do network
+    // I/O, for example to contact a remote catalog or do an object store LIST
+    let df = ctx.sql(sql).await?;
+
+    // While many examples call `collect` or `show()`, those methods buffers 
the
+    // results. internally DataFusion generates output a RecordBatch at a time
+
+    // Calling `execute_stream` on a DataFrame returns a
+    // `SendableRecordBatchStream`. Depending on the plan, this may also do
+    // network I/O, for example to begin reading a parquet file from a remote
+    // object store as well. It is also possible that this function call spawns
+    // tasks that begin doing CPU intensive work as well
+    let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+    // Calling `next()` drives the plan, producing new `RecordBatch`es using 
the
+    // current runtime (and typically also the current thread).
+    //
+    // Perhaps somewhat non obvious, calling the `next()` function often will
+    // result in other tasks being spawned on the current runtime (e.g. for
+    // `RepartitionExec` to read data from each of its input partitions in
+    // parallel).
+    //
+    // Executing the plan like this results in all CPU intensive work
+    // running on same (default) Runtime.
+    while let Some(batch) = stream.next().await {
+        println!("{}", pretty_format_batches(&[batch?]).unwrap());
+    }
+    Ok(())
+}
+
+/// Demonstrates how to run queries on a **different** runtime than the 
current one
+///
+async fn different_runtime_advanced() -> Result<()> {
+    // In this example, we will configure access to a remote object store
+    // over the network during the plan
+
+    let dedicated_executor = DedicatedExecutor::builder().build();
+
+    // setup http object store
+    let base_url = Url::parse("https://github.com";).unwrap();
+    let http_store: Arc<dyn ObjectStore> =
+        Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?);
+
+    // By default, the object store will use the "current runtime" for IO 
operations
+    // if we use a dedicated executor to run the plan, the eventual object 
store requests will also use the
+    // dedicated executor's runtime
+    //
+    // To avoid this, we can wrap the object store to run on the "IO" runtime
+    //
+    // (if we don't do this the example fails with an error like
+    //
+    // ctx.register_object_store(&base_url, http_store);
+    // A Tokio 1.x context was found, but timers are disabled. Call 
`enable_time` on the runtime builder to enable timers.
+
+    //let http_store = dedicated_executor.wrap_object_store(http_store);
+
+    // we must also register the dedicated executor with the runtime
+    let runtime_env = RuntimeEnvBuilder::new()

Review Comment:
   In this example the DedicatedExecutor is registered directly with the 
`RuntimeEnv`
   
   I actually think this is a much nicer API than having to wrap various calls 
with dedicated executors



##########
datafusion/execution/src/dedicated_executor.rs:
##########
@@ -0,0 +1,1004 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio 
runtime.
+//!
+//! Originally from [InfluxDB 3.0]
+//!
+//! [InfluxDB 3.0]: 
https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
+use crate::cross_rt_stream::CrossRtStream;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::SendableRecordBatchStream;
+use datafusion_common::DataFusionError;
+use futures::{
+    future::{BoxFuture, Shared},
+    Future, FutureExt, Stream, TryFutureExt,
+};
+use log::{info, warn};
+use parking_lot::RwLock;
+use std::cell::RefCell;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{fmt::Display, sync::Arc, time::Duration};
+use tokio::runtime::Builder;
+use tokio::task::JoinHandle;
+use tokio::{
+    runtime::Handle,
+    sync::{oneshot::error::RecvError, Notify},
+    task::JoinSet,
+};
+
+impl From<Builder> for DedicatedExecutorBuilder {
+    fn from(value: Builder) -> Self {
+        Self::new_from_builder(value)
+    }
+}
+
+/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such
+/// as DataFusion `ExecutionPlans`.
+///
+/// See [`DedicatedExecutorBuilder`] for creating a new instance.
+///
+/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound
+/// tasks on the same threadpool by running futures (and any `tasks` that are
+/// `tokio::task::spawned` by them) on a separate tokio [`Executor`].
+///
+/// DedicatedExecutor can be `clone`ed and all clones share the same 
threadpool.
+///
+/// TODO add note about `io_thread`
+///
+/// TODO: things we use in InfluxData
+/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we 
can wrap DedicatedExectors like IOxDedicatedExecutors
+/// 2. Some sort of hook to install tokio metrics
+///
+/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio
+/// runtime will be maked for io, via [`register_io_runtime`] by all threads
+/// spawned by the executor. Any I/O done by threads in this
+/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the 
I/O
+/// runtime.
+///
+/// ## TODO examples
+///
+/// # Background
+///
+/// Tokio has the notion of the "current" runtime, which runs the current 
future
+/// and any tasks spawned by it. Typically, this is the runtime created by
+/// `tokio::main` and is used for the main application logic and I/O handling
+///
+/// For CPU bound work, such as DataFusion plan execution, it is important to
+/// run on a separate thread pool to avoid blocking the I/O handling for 
extended
+/// periods of time in order to avoid long poll latencies (which decreases the
+/// throughput of small requests under concurrent load).
+///
+/// # IO Scheduling
+///
+/// I/O, such as network calls, should not be performed on the runtime managed
+/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
+/// CPU tasks will not be preempted and can therefore starve servicing of other
+/// tasks. This manifests in long poll-latencies, where a task is ready to run
+/// but isn't being scheduled to run. For CPU-bound work this isn't a problem 
as
+/// there is no external party waiting on a response, however, for I/O tasks,
+/// long poll latencies can prevent timely servicing of IO, which can have a
+/// significant detrimental effect.
+///
+/// # Details
+///
+/// The worker thread priority is set to low so that such tasks do
+/// not starve other more important tasks (such as answering health checks)
+///
+/// Follows the example from stack overflow and spawns a new
+/// thread to install a Tokio runtime "context"
+/// <https://stackoverflow.com/questions/62536566>
+///
+/// # Trouble Shooting:
+///
+/// ## "No IO runtime registered. Call 
`register_io_runtime`/`register_current_runtime_for_io` in current thread!
+///
+/// This means that IO was attempted on a tokio runtime that was not registered
+/// for IO. One solution is to run the task using 
[DedicatedExecutor::spawn_cpu].
+///
+/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
+///
+/// If you try to use this structure from an async context you see something 
like
+/// thread 'test_builder_plan' panicked at 'Cannot
+/// drop a runtime in a context where blocking is not allowed it means  This
+/// happens when a runtime is dropped from within an asynchronous
+/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
+///
+/// TODO: make this an Arc<..> rather than an cloneable thing (to follow the 
smae
+/// pattern as the rest of the system)
+#[derive(Clone, Debug)]
+pub struct DedicatedExecutor {
+    state: Arc<RwLock<State>>,
+}
+
+impl DedicatedExecutor {
+    /// Create a new builder to crate a [`DedicatedExecutor`]
+    pub fn builder() -> DedicatedExecutorBuilder {
+        DedicatedExecutorBuilder::new()
+    }
+
+    /// Runs the specified [`Future`] (and any tasks it spawns) on the thread
+    /// pool managed by this `DedicatedExecutor`.
+    ///
+    /// # TODO: make this wait (aka so the API doesn't start a new background 
task or whatever)
+    ///
+    /// # Notes
+    ///
+    /// This task is run on a dedicated Tokio runtime that purposely does not 
have
+    /// IO enabled. If your future makes any IO calls, you have to
+    /// explicitly run them on DedicatedExecutor::spawn_io.
+    ///
+    /// If you see a message like this
+    ///
+    /// (Panic { msg: "A Tokio 1.x context was found, but timers are disabled. 
Call `enable_time` on the runtime builder to enable timers."
+    ///
+    /// It means some work that was meant to be done on the IO runtime was done
+    /// on the CPU runtime.
+    ///
+    /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** 
when
+    /// it is dropped. Thus, you need ensure the returned future lives until it
+    /// completes (call `await`) or you wish to cancel it.
+    ///
+    /// All spawned tasks are added to the tokio executor immediately and
+    /// compete for the threadpool's resources.
+    pub fn spawn_cpu<T>(
+        &self,
+        task: T,
+    ) -> impl Future<Output = Result<T::Output, JobError>>
+    where
+        T: Future + Send + 'static,
+        T::Output: Send + 'static,
+    {
+        let handle = {
+            let state = self.state.read();
+            state.handle.clone()
+        };
+
+        let Some(handle) = handle else {
+            return futures::future::err(JobError::WorkerGone).boxed();
+        };
+
+        // use JoinSet implement "cancel on drop"
+        let mut join_set = JoinSet::new();
+        join_set.spawn_on(task, &handle);
+        async move {
+            join_set
+                .join_next()
+                .await
+                .expect("just spawned task")
+                .map_err(|e| match e.try_into_panic() {
+                    Ok(e) => {
+                        let s = if let Some(s) = e.downcast_ref::<String>() {
+                            s.clone()
+                        } else if let Some(s) = e.downcast_ref::<&str>() {
+                            s.to_string()
+                        } else {
+                            "unknown internal error".to_string()
+                        };
+
+                        JobError::Panic { msg: s }
+                    }
+                    Err(_) => JobError::WorkerGone,
+                })
+        }
+        .boxed()
+    }
+
+    /// Runs the specified work on the dedicated executor and returns the 
result
+    ///
+    /// Note the future is not 'static (aka it can have internal references)
+    pub fn spawn_cpu2<'a, T>(&self, task: T) -> impl Future<Output = T::Output>
+    where
+        T: Future + Send + 'a,
+        T::Output: Send,
+    {
+        // If we can figure out how to make this work, then
+        // we could integrate it nicely into DataFusion
+        async { todo!() }
+    }

Review Comment:
   If we can figure out  way to implement this function, the "annotate all 
callsites" is plausible
   
   The key difference is:
   ```diff
   -        T: Future + Send + 'static,
   +        T: Future + Send + 'a,
   ```
   
   The idea is there are a bunch of async function calls like this:
   
   ```rust
                   self
                       .list_files_for_scan(
                           &session_state,
                           &partition_filters,
                           limit,
                       )
                       .await
   ```
   
   Rust can figure out somehow that if this future runs on the same runtime, 
the references are valid for long enough and will compile it.
   
   However when I tried to run this kind of function on a different runtime 
(the whole point of this PR), Rust can't figure out that the future body 
doesn't escape the current function and thus I can't get it to compile. The 
only way to pass the future to a different runtime is if the lifetime is 
`'static`. 
   
   I believe the root cause is that all the functions for spawning / running 
tasks on a tokio runtime require the Futures are `'static` -- that is that they 
have no references internally. For example:
   - https://docs.rs/tokio/latest/tokio/task/fn.spawn.html
   ![Screenshot 2024-12-08 at 12 03 52 
PM](https://github.com/user-attachments/assets/745ff9ca-5e23-4d1b-a4c2-96927bc8414e)
   
   
   The only way I could make it work was to clone all the arguments (we could 
likely avoid this by restructing the code to be much more careful about what 
arguments are passed, need to be cloned, etc) but doing so would be a pretty 
large undertaking and likely require significantly more copying, like:
   
   
   ```rust
           let self_captures = self.clone(); // clone so we can pass future to 
the other pool
           let (mut partitioned_file_lists, statistics) = state
               .runtime_env()
               .spawn_io(async move {
                   self_captured
                       .list_files_for_scan(
                           &session_state_captured,
                           &partition_filters_captured,
                           limit,
                       )
                       .await
               })
               .await?;
   ```
   



##########
datafusion/core/src/datasource/dynamic_file.rs:
##########
@@ -55,33 +55,42 @@ impl UrlTableFactory for DynamicListTableFactory {
             return Ok(None);
         };
 
-        let state = &self
+        let session = self
             .session_store()
             .get_session()
             .upgrade()
-            .and_then(|session| {
-                session
-                    .read()
-                    .as_any()
-                    .downcast_ref::<SessionState>()
-                    .cloned()
-            })
             .ok_or_else(|| plan_datafusion_err!("get current SessionStore 
error"))?;
 
-        match ListingTableConfig::new(table_url.clone())
-            .infer_options(state)
+        let runtime_env = Arc::clone(&session.read().runtime_env());
+
+        let Some(state) = session
+            .read()
+            .as_any()
+            .downcast_ref::<SessionState>()
+            .cloned()
+        else {
+            return internal_err!("Expected SessionState, got something else");
+        };
+
+        // Do remove catalog operations on a different runtime
+        runtime_env

Review Comment:
   This shows how annotating the I/O call sites in DataFusion would look like 
(calling `spawn_io` internally)



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -608,10 +608,14 @@ impl SessionContext {
         sql: &str,
         options: SQLOptions,
     ) -> Result<DataFrame> {
-        let plan = self.state().create_logical_plan(sql).await?;
-        options.verify_plan(&plan)?;
+        self.runtime_env()

Review Comment:
   And here is an example of how I would like to be able to call high level 
DataFusion APIs to to run cpu bound work, but I can't get the lifetimes to work 
out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to