jayzhan211 commented on code in PR #14616:
URL: https://github.com/apache/datafusion/pull/14616#discussion_r1952536901


##########
datafusion/catalog-listing/src/file_stream_part.rs:
##########
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A generic stream over file format readers that can be used by
+//! any file format that read its files from start to end.
+//!
+//! Note: Most traits here need to be marked `Sync + Send` to be
+//! compliant with the `SendableRecordBatchStream` trait.
+
+use crate::file_meta::FileMeta;
+use datafusion_common::error::Result;
+use datafusion_physical_plan::metrics::{
+    Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
+};
+
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::instant::Instant;
+use datafusion_common::ScalarValue;
+
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A fallible future that resolves to a stream of [`RecordBatch`]
+pub type FileOpenFuture =
+    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, 
ArrowError>>>>;
+
+/// Describes the behavior of the `FileStream` if file opening or scanning 
fails
+pub enum OnError {
+    /// Fail the entire stream and return the underlying error
+    Fail,
+    /// Continue scanning, ignoring the failed file
+    Skip,
+}
+
+impl Default for OnError {
+    fn default() -> Self {
+        Self::Fail
+    }
+}
+
+/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
+/// stream of [`RecordBatch`]
+///
+/// [`ObjectStore`]: object_store::ObjectStore
+pub trait FileOpener: Unpin + Send + Sync {
+    /// Asynchronously open the specified file and return a stream
+    /// of [`RecordBatch`]
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
+}
+
+/// Represents the state of the next `FileOpenFuture`. Since we need to poll
+/// this future while scanning the current file, we need to store the result 
if it
+/// is ready
+pub enum NextOpen {
+    Pending(FileOpenFuture),
+    Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
+}
+
+pub enum FileStreamState {
+    /// The idle state, no file is currently being read
+    Idle,
+    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
+    /// for a given file
+    Open {
+        /// A [`FileOpenFuture`] returned by [`FileOpener::open`]
+        future: FileOpenFuture,
+        /// The partition values for this file
+        partition_values: Vec<ScalarValue>,
+    },
+    /// Scanning the [`BoxStream`] returned by the completion of a 
[`FileOpenFuture`]
+    /// returned by [`FileOpener::open`]
+    Scan {
+        /// Partitioning column values for the current batch_iter
+        partition_values: Vec<ScalarValue>,
+        /// The reader instance
+        reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
+        /// A [`FileOpenFuture`] for the next file to be processed,
+        /// and its corresponding partition column values, if any.
+        /// This allows the next file to be opened in parallel while the
+        /// current file is read.
+        next: Option<(NextOpen, Vec<ScalarValue>)>,
+    },
+    /// Encountered an error
+    Error,
+    /// Reached the row limit
+    Limit,
+}
+
+/// A timer that can be started and stopped.
+pub struct StartableTime {
+    pub metrics: Time,
+    // use for record each part cost time, will eventually add into 'metrics'.
+    pub start: Option<Instant>,
+}
+
+impl StartableTime {
+    pub fn start(&mut self) {
+        assert!(self.start.is_none());
+        self.start = Some(Instant::now());
+    }
+
+    pub fn stop(&mut self) {
+        if let Some(start) = self.start.take() {
+            self.metrics.add_elapsed(start);
+        }
+    }
+}
+
+#[allow(rustdoc::broken_intra_doc_links)]
+/// Metrics for [`FileStream`]
+///
+/// Note that all of these metrics are in terms of wall clock time
+/// (not cpu time) so they include time spent waiting on I/O as well
+/// as other operators.
+///
+/// [`FileStream`]: 
<https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
+pub struct FileStreamMetrics {

Review Comment:
   I prefer `file_stream`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to