alamb commented on code in PR #13690: URL: https://github.com/apache/datafusion/pull/13690#discussion_r1874949626
########## datafusion-examples/examples/thread_pools.rs: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to +//! run the CPU intensive parts of DataFusion plans. +//! +//! Running DataFusion plans that perform I/O, such as reading parquet files +//! directly from remote object storage (e.g. AWS S3) without care will result +//! in running CPU intensive jobs on the same thread pool, which can lead to the +//! issues described in the [Architecture section] such as throttled bandwidth +//! due to congestion control and increased latencies for processing network +//! messages. +use arrow::util::pretty::pretty_format_batches; +use datafusion::error::Result; +use datafusion::execution::dedicated_executor::DedicatedExecutor; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use object_store::http::HttpBuilder; +use object_store::ObjectStore; +use std::sync::Arc; +use url::Url; + +/// Normally, you don't need to worry about the details of the tokio runtime, +/// but for this example it is important to understand how the [`Runtime`]s work. +/// +/// There is a "current" runtime that is installed in a thread local variable +/// that is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as +/// as the "current" runtime (on which any `async` futures, streams and tasks +/// are run). +#[tokio::main] +async fn main() -> Result<()> { + // The first two examples only do local file IO. Enable the URL table so we + // can select directly from filenames in SQL. + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run the same query on the same runtime. Note that calling `await` here + // will effectively run the future (in this case the `async` function) on + // the current runtime + same_runtime(&sql).await?; + + // Run the same query on a different runtime. + // Run the same query on a different runtime including remote IO + different_runtime_advanced().await?; + + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is now most examples in DataFusion are written and works well for +/// development and local query processing. +async fn same_runtime(sql: &str) -> Result<()> { + let ctx = SessionContext::new().enable_url_table(); + + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` on a DataFrame returns a + // `SendableRecordBatchStream`. Depending on the plan, this may also do + // network I/O, for example to begin reading a parquet file from a remote + // object store as well. It is also possible that this function call spawns + // tasks that begin doing CPU intensive work as well + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` drives the plan, producing new `RecordBatch`es using the + // current runtime (and typically also the current thread). + // + // Perhaps somewhat non obvious, calling the `next()` function often will + // result in other tasks being spawned on the current runtime (e.g. for + // `RepartitionExec` to read data from each of its input partitions in + // parallel). + // + // Executing the plan like this results in all CPU intensive work + // running on same (default) Runtime. + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Demonstrates how to run queries on a **different** runtime than the current one +/// +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will configure access to a remote object store + // over the network during the plan + + let dedicated_executor = DedicatedExecutor::builder().build(); + + // setup http object store + let base_url = Url::parse("https://github.com").unwrap(); + let http_store: Arc<dyn ObjectStore> = + Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?); + + // By default, the object store will use the "current runtime" for IO operations + // if we use a dedicated executor to run the plan, the eventual object store requests will also use the + // dedicated executor's runtime + // + // To avoid this, we can wrap the object store to run on the "IO" runtime + // + // (if we don't do this the example fails with an error like + // + // ctx.register_object_store(&base_url, http_store); + // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. + + //let http_store = dedicated_executor.wrap_object_store(http_store); + + // we must also register the dedicated executor with the runtime + let runtime_env = RuntimeEnvBuilder::new() Review Comment: In this example the DedicatedExecutor is registered directly with the `RuntimeEnv` I actually think this is a much nicer API than having to wrap various calls with dedicated executors ########## datafusion/execution/src/dedicated_executor.rs: ########## @@ -0,0 +1,1004 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use crate::cross_rt_stream::CrossRtStream; +use crate::stream::RecordBatchStreamAdapter; +use crate::SendableRecordBatchStream; +use datafusion_common::DataFusionError; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, Stream, TryFutureExt, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From<Builder> for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// DedicatedExecutor can be `clone`ed and all clones share the same threadpool. +/// +/// TODO add note about `io_thread` +/// +/// TODO: things we use in InfluxData +/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors +/// 2. Some sort of hook to install tokio metrics +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// <https://stackoverflow.com/questions/62536566> +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn_cpu]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +/// TODO: make this an Arc<..> rather than an cloneable thing (to follow the smae +/// pattern as the rest of the system) +#[derive(Clone, Debug)] +pub struct DedicatedExecutor { + state: Arc<RwLock<State>>, +} + +impl DedicatedExecutor { + /// Create a new builder to crate a [`DedicatedExecutor`] + pub fn builder() -> DedicatedExecutorBuilder { + DedicatedExecutorBuilder::new() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # TODO: make this wait (aka so the API doesn't start a new background task or whatever) + /// + /// # Notes + /// + /// This task is run on a dedicated Tokio runtime that purposely does not have + /// IO enabled. If your future makes any IO calls, you have to + /// explicitly run them on DedicatedExecutor::spawn_io. + /// + /// If you see a message like this + /// + /// (Panic { msg: "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." + /// + /// It means some work that was meant to be done on the IO runtime was done + /// on the CPU runtime. + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn_cpu<T>( + &self, + task: T, + ) -> impl Future<Output = Result<T::Output, JobError>> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::<String>() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// Runs the specified work on the dedicated executor and returns the result + /// + /// Note the future is not 'static (aka it can have internal references) + pub fn spawn_cpu2<'a, T>(&self, task: T) -> impl Future<Output = T::Output> + where + T: Future + Send + 'a, + T::Output: Send, + { + // If we can figure out how to make this work, then + // we could integrate it nicely into DataFusion + async { todo!() } + } Review Comment: If we can figure out way to implement this function, the "annotate all callsites" is plausible The key difference is: ```diff - T: Future + Send + 'static, + T: Future + Send + 'a, ``` The idea is there are a bunch of async function calls like this: ```rust self .list_files_for_scan( &session_state, &partition_filters, limit, ) .await ``` Rust can figure out somehow that if this future runs on the same runtime, the references are valid for long enough and will compile it. However when I tried to run this kind of function on a different runtime (the whole point of this PR), Rust can't figure out that the future body doesn't escape the current function and thus I can't get it to compile. The only way to pass the future to a different runtime is if the lifetime is `'static`. I believe the root cause is that all the functions for spawning / running tasks on a tokio runtime require the Futures are `'static` -- that is that they have no references internally. For example: - https://docs.rs/tokio/latest/tokio/task/fn.spawn.html  The only way I could make it work was to clone all the arguments (we could likely avoid this by restructing the code to be much more careful about what arguments are passed, need to be cloned, etc) but doing so would be a pretty large undertaking and likely require significantly more copying, like: ```rust let self_captures = self.clone(); // clone so we can pass future to the other pool let (mut partitioned_file_lists, statistics) = state .runtime_env() .spawn_io(async move { self_captured .list_files_for_scan( &session_state_captured, &partition_filters_captured, limit, ) .await }) .await?; ``` ########## datafusion/core/src/datasource/dynamic_file.rs: ########## @@ -55,33 +55,42 @@ impl UrlTableFactory for DynamicListTableFactory { return Ok(None); }; - let state = &self + let session = self .session_store() .get_session() .upgrade() - .and_then(|session| { - session - .read() - .as_any() - .downcast_ref::<SessionState>() - .cloned() - }) .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?; - match ListingTableConfig::new(table_url.clone()) - .infer_options(state) + let runtime_env = Arc::clone(&session.read().runtime_env()); + + let Some(state) = session + .read() + .as_any() + .downcast_ref::<SessionState>() + .cloned() + else { + return internal_err!("Expected SessionState, got something else"); + }; + + // Do remove catalog operations on a different runtime + runtime_env Review Comment: This shows how annotating the I/O call sites in DataFusion would look like (calling `spawn_io` internally) ########## datafusion/core/src/execution/context/mod.rs: ########## @@ -608,10 +608,14 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result<DataFrame> { - let plan = self.state().create_logical_plan(sql).await?; - options.verify_plan(&plan)?; + self.runtime_env() Review Comment: And here is an example of how I would like to be able to call high level DataFusion APIs to to run cpu bound work, but I can't get the lifetimes to work out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org