alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135821535


##########
datafusion-examples/examples/thread_pools.rs:
##########
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s 
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and 
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime), 
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]: 
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro  creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+    // The first two examples read local files. Enabling the URL table feature
+    // lets us treat filenames as tables in SQL.
+    let ctx = SessionContext::new().enable_url_table();
+    let sql = format!(
+        "SELECT * FROM '{}/alltypes_plain.parquet'",
+        datafusion::test_util::parquet_test_data()
+    );
+
+    // Run a query on the current runtime. Calling `await` means the future
+    // (in this case the `async` function and all spawned work in DataFusion
+    // plans) on the current runtime.
+    same_runtime(&ctx, &sql).await?;
+
+    // Run the same query but this time on a different runtime.
+    //
+    // Since we call `await` here, the `async` function itself runs on the
+    // current runtime, but internally `different_runtime_basic` executes the
+    // DataFusion plan on a different Runtime.
+    different_runtime_basic(ctx, sql).await?;
+
+    // Run the same query on a different runtime, including remote IO.
+    //
+    // NOTE: This is best practice for production systems
+    different_runtime_advanced().await?;
+
+    Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+    // Calling .sql is an async function as it may also do network
+    // I/O, for example to contact a remote catalog or do an object store LIST
+    let df = ctx.sql(sql).await?;
+
+    // While many examples call `collect` or `show()`, those methods buffers 
the
+    // results. Internally DataFusion generates output a RecordBatch at a time
+
+    // Calling `execute_stream` return a `SendableRecordBatchStream`. Depending
+    // on the plan, this may also do network I/O, for example to begin reading 
a
+    // parquet file from a remote object store.
+    let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+    // `next()` drives the plan, incrementally producing new `RecordBatch`es
+    // using the current runtime.
+    //
+    // Perhaps somewhat non obviously, calling `next()` can also result in 
other
+    // tasks being spawned on the current runtime (e.g. for `RepartitionExec` 
to
+    // read data from each of its input partitions in parallel).
+    //
+    // Executing the plan using this pattern intermixes any IO and CPU 
intensive
+    // work on same Runtime
+    while let Some(batch) = stream.next().await {
+        println!("{}", pretty_format_batches(&[batch?]).unwrap());
+    }
+    Ok(())
+}
+
+/// Run queries on a **different** Runtime dedicated for CPU bound work
+///
+/// This example is suitable for running DataFusion plans against local data
+/// sources (e.g. files) and returning results to an async destination, as 
might
+/// be done to return query results to a remote client.
+///
+/// Production systems which also read data locally or require very low latency
+/// should follow the recommendations on [`different_runtime_advanced`] when
+/// processing data from a remote source such as object storage.
+async fn different_runtime_basic(ctx: SessionContext, sql: String) -> 
Result<()> {
+    // Since we are already in the context of runtime (installed by
+    // #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks
+    let cpu_runtime = CpuRuntime::try_new()?;
+
+    // Prepare a task that runs the plan on cpu_runtime and sends
+    // the results back to the original runtime via a channel.
+    let (tx, mut rx) = tokio::sync::mpsc::channel(2);
+    let driver_task = async move {
+        // Plan the query (which might require CPU work to evaluate statistics)
+        let df = ctx.sql(&sql).await?;
+        let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
+
+        // Calling `next()` to drive the plan in this task drives the
+        // execution from the cpu runtime the other thread pool
+        //
+        // NOTE any IO run by this plan (for example, reading from an
+        // `ObjectStore`) will be done on this new thread pool as well.
+        while let Some(batch) = stream.next().await {
+            if tx.send(batch).await.is_err() {
+                // error means dropped receiver, so nothing will get results 
anymore
+                return Ok(());
+            }
+        }
+        Ok(()) as Result<()>
+    };
+
+    // Run the driver task on the cpu runtime. Use a JoinSet to
+    // ensure the spawned task is canceled on error/drop
+    let mut join_set = JoinSet::new();
+    join_set.spawn_on(driver_task, cpu_runtime.handle());
+
+    // Retrieve the results in the original (IO) runtime. This requires only
+    // minimal work (pass pointers around).
+    while let Some(batch) = rx.recv().await {
+        println!("{}", pretty_format_batches(&[batch?])?);
+    }
+
+    // wait for completion of the driver task
+    drain_join_set(join_set).await;
+
+    Ok(())
+}
+
+/// Run CPU intensive work on a different runtime but do IO operations (object
+/// store access) on the current runtime.
+async fn different_runtime_advanced() -> Result<()> {
+    // In this example, we will query a file via https, reading
+    // the data directly from the plan
+
+    // The current runtime (created by tokio::main) is used for IO
+    //
+    // Note this handle should be used for *ALL* remote IO operations in your
+    // systems, including remote catalog access, which is not included in this
+    // example.
+    let cpu_runtime = CpuRuntime::try_new()?;
+    let io_handle = Handle::current();

Review Comment:
   Yeah. Another consideration is that most uses of tokio are not for CPU bound 
work, so it makes sense to just use the default pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to