This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cbb750  Add End-to-end test for parquet pruning + metrics for 
ParquetExec (#657)
8cbb750 is described below

commit 8cbb750faab3189813e95681bc2af53f20c9f0c7
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jul 6 12:41:28 2021 -0400

    Add End-to-end test for parquet pruning + metrics for ParquetExec (#657)
    
    * End to end tests for parquet pruning
    
    * remove unused dep
    
    * Make the separation of per-partition and per-exec metrics clearer
    
    * Account for statistics once rather than per row group
    
    * Fix timestamps to use UTC time
---
 datafusion/src/physical_optimizer/repartition.rs |  22 +-
 datafusion/src/physical_plan/mod.rs              |  14 +
 datafusion/src/physical_plan/parquet.rs          | 156 +++++++++--
 datafusion/src/test/mod.rs                       |  10 +-
 datafusion/tests/parquet_pruning.rs              | 343 +++++++++++++++++++++++
 5 files changed, 508 insertions(+), 37 deletions(-)

diff --git a/datafusion/src/physical_optimizer/repartition.rs 
b/datafusion/src/physical_optimizer/repartition.rs
index 011db64..4504c81 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,7 +110,9 @@ mod tests {
 
     use super::*;
     use crate::datasource::datasource::Statistics;
-    use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
+    use crate::physical_plan::parquet::{
+        ParquetExec, ParquetExecMetrics, ParquetPartition,
+    };
     use crate::physical_plan::projection::ProjectionExec;
 
     #[test]
@@ -119,12 +121,13 @@ mod tests {
         let parquet_project = ProjectionExec::try_new(
             vec![],
             Arc::new(ParquetExec::new(
-                vec![ParquetPartition {
-                    filenames: vec!["x".to_string()],
-                    statistics: Statistics::default(),
-                }],
+                vec![ParquetPartition::new(
+                    vec!["x".to_string()],
+                    Statistics::default(),
+                )],
                 schema,
                 None,
+                ParquetExecMetrics::new(),
                 None,
                 2048,
                 None,
@@ -156,12 +159,13 @@ mod tests {
             Arc::new(ProjectionExec::try_new(
                 vec![],
                 Arc::new(ParquetExec::new(
-                    vec![ParquetPartition {
-                        filenames: vec!["x".to_string()],
-                        statistics: Statistics::default(),
-                    }],
+                    vec![ParquetPartition::new(
+                        vec!["x".to_string()],
+                        Statistics::default(),
+                    )],
                     schema,
                     None,
+                    ParquetExecMetrics::new(),
                     None,
                     2048,
                     None,
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index a940cbe..d89eb11 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -297,6 +297,20 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
     Ok(())
 }
 
+/// Recursively gateher all execution metrics from this plan and all of its 
input plans
+pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, 
SQLMetric> {
+    fn get_metrics_inner(
+        plan: &dyn ExecutionPlan,
+        mut metrics: HashMap<String, SQLMetric>,
+    ) -> HashMap<String, SQLMetric> {
+        metrics.extend(plan.metrics().into_iter());
+        plan.children().into_iter().fold(metrics, |metrics, child| {
+            get_metrics_inner(child.as_ref(), metrics)
+        })
+    }
+    get_metrics_inner(plan.as_ref(), HashMap::new())
+}
+
 /// Execute the [ExecutionPlan] and collect the results in memory
 pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> 
{
     match plan.output_partitioning().partition_count() {
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index 3d20a9b..f31b921 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -40,6 +40,8 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
+use hashbrown::HashMap;
+use log::debug;
 use parquet::file::{
     metadata::RowGroupMetaData,
     reader::{FileReader, SerializedFileReader},
@@ -59,6 +61,8 @@ use crate::datasource::datasource::{ColumnStatistics, 
Statistics};
 use async_trait::async_trait;
 use futures::stream::{Stream, StreamExt};
 
+use super::SQLMetric;
+
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -72,6 +76,8 @@ pub struct ParquetExec {
     batch_size: usize,
     /// Statistics for the data set (sum of statistics for all partitions)
     statistics: Statistics,
+    /// metrics for the overall execution
+    metrics: ParquetExecMetrics,
     /// Optional predicate builder
     predicate_builder: Option<PruningPredicate>,
     /// Optional limit of the number of rows
@@ -93,6 +99,24 @@ pub struct ParquetPartition {
     pub filenames: Vec<String>,
     /// Statistics for this partition
     pub statistics: Statistics,
+    /// Execution metrics
+    metrics: ParquetPartitionMetrics,
+}
+
+/// Stores metrics about the overall parquet execution
+#[derive(Debug, Clone)]
+pub struct ParquetExecMetrics {
+    /// Numer of times the pruning predicate could not be created
+    pub predicate_creation_errors: Arc<SQLMetric>,
+}
+
+/// Stores metrics about the parquet execution for a particular 
ParquetPartition
+#[derive(Debug, Clone)]
+struct ParquetPartitionMetrics {
+    /// Numer of times the predicate could not be evaluated
+    pub predicate_evaluation_errors: Arc<SQLMetric>,
+    /// Number of row groups pruned using
+    pub row_groups_pruned: Arc<SQLMetric>,
 }
 
 impl ParquetExec {
@@ -140,6 +164,8 @@ impl ParquetExec {
         max_concurrency: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
+        debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, 
predicate: {:?}, limit: {:?}",
+               filenames, projection, predicate, limit);
         // build a list of Parquet partitions with statistics and gather all 
unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
@@ -205,10 +231,7 @@ impl ParquetExec {
             };
             // remove files that are not needed in case of limit
             filenames.truncate(total_files);
-            partitions.push(ParquetPartition {
-                filenames,
-                statistics,
-            });
+            partitions.push(ParquetPartition::new(filenames, statistics));
             if limit_exhausted {
                 break;
             }
@@ -225,14 +248,27 @@ impl ParquetExec {
             )));
         }
         let schema = Arc::new(schemas.pop().unwrap());
+        let metrics = ParquetExecMetrics::new();
+
         let predicate_builder = predicate.and_then(|predicate_expr| {
-            PruningPredicate::try_new(&predicate_expr, schema.clone()).ok()
+            match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
+                Ok(predicate_builder) => Some(predicate_builder),
+                Err(e) => {
+                    debug!(
+                        "Could not create pruning predicate for {:?}: {}",
+                        predicate_expr, e
+                    );
+                    metrics.predicate_creation_errors.add(1);
+                    None
+                }
+            }
         });
 
         Ok(Self::new(
             partitions,
             schema,
             projection,
+            metrics,
             predicate_builder,
             batch_size,
             limit,
@@ -244,6 +280,7 @@ impl ParquetExec {
         partitions: Vec<ParquetPartition>,
         schema: SchemaRef,
         projection: Option<Vec<usize>>,
+        metrics: ParquetExecMetrics,
         predicate_builder: Option<PruningPredicate>,
         batch_size: usize,
         limit: Option<usize>,
@@ -307,6 +344,7 @@ impl ParquetExec {
             partitions,
             schema: Arc::new(projected_schema),
             projection,
+            metrics,
             predicate_builder,
             batch_size,
             statistics,
@@ -341,6 +379,7 @@ impl ParquetPartition {
         Self {
             filenames,
             statistics,
+            metrics: ParquetPartitionMetrics::new(),
         }
     }
 
@@ -355,6 +394,25 @@ impl ParquetPartition {
     }
 }
 
+impl ParquetExecMetrics {
+    /// Create new metrics
+    pub fn new() -> Self {
+        Self {
+            predicate_creation_errors: SQLMetric::counter(),
+        }
+    }
+}
+
+impl ParquetPartitionMetrics {
+    /// Create new metrics
+    pub fn new() -> Self {
+        Self {
+            predicate_evaluation_errors: SQLMetric::counter(),
+            row_groups_pruned: SQLMetric::counter(),
+        }
+    }
+}
+
 #[async_trait]
 impl ExecutionPlan for ParquetExec {
     /// Return a reference to Any that can be used for downcasting
@@ -398,7 +456,9 @@ impl ExecutionPlan for ParquetExec {
             Receiver<ArrowResult<RecordBatch>>,
         ) = channel(2);
 
-        let filenames = self.partitions[partition].filenames.clone();
+        let partition = &self.partitions[partition];
+        let filenames = partition.filenames.clone();
+        let metrics = partition.metrics.clone();
         let projection = self.projection.clone();
         let predicate_builder = self.predicate_builder.clone();
         let batch_size = self.batch_size;
@@ -407,6 +467,7 @@ impl ExecutionPlan for ParquetExec {
         task::spawn_blocking(move || {
             if let Err(e) = read_files(
                 &filenames,
+                metrics,
                 &projection,
                 &predicate_builder,
                 batch_size,
@@ -448,6 +509,31 @@ impl ExecutionPlan for ParquetExec {
             }
         }
     }
+
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        self.partitions
+            .iter()
+            .flat_map(|p| {
+                [
+                    (
+                        format!(
+                            "numPredicateEvaluationErrors for {}",
+                            p.filenames.join(",")
+                        ),
+                        p.metrics.predicate_evaluation_errors.as_ref().clone(),
+                    ),
+                    (
+                        format!("numRowGroupsPruned for {}", 
p.filenames.join(",")),
+                        p.metrics.row_groups_pruned.as_ref().clone(),
+                    ),
+                ]
+            })
+            .chain(std::iter::once((
+                "numPredicateCreationErrors".to_string(),
+                self.metrics.predicate_creation_errors.as_ref().clone(),
+            )))
+            .collect()
+    }
 }
 
 fn send_result(
@@ -547,6 +633,7 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
 
 fn build_row_group_predicate(
     predicate_builder: &PruningPredicate,
+    metrics: ParquetPartitionMetrics,
     row_group_metadata: &[RowGroupMetaData],
 ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
     let parquet_schema = predicate_builder.schema().as_ref();
@@ -555,21 +642,28 @@ fn build_row_group_predicate(
         row_group_metadata,
         parquet_schema,
     };
-
     let predicate_values = predicate_builder.prune(&pruning_stats);
 
-    let predicate_values = match predicate_values {
-        Ok(values) => values,
+    match predicate_values {
+        Ok(values) => {
+            // NB: false means don't scan row group
+            let num_pruned = values.iter().filter(|&v| !v).count();
+            metrics.row_groups_pruned.add(num_pruned);
+            Box::new(move |_, i| values[i])
+        }
         // stats filter array could not be built
         // return a closure which will not filter out any row groups
-        _ => return Box::new(|_r, _i| true),
-    };
-
-    Box::new(move |_, i| predicate_values[i])
+        Err(e) => {
+            debug!("Error evaluating row group predicate values {}", e);
+            metrics.predicate_evaluation_errors.add(1);
+            Box::new(|_r, _i| true)
+        }
+    }
 }
 
 fn read_files(
     filenames: &[String],
+    metrics: ParquetPartitionMetrics,
     projection: &[usize],
     predicate_builder: &Option<PruningPredicate>,
     batch_size: usize,
@@ -583,6 +677,7 @@ fn read_files(
         if let Some(predicate_builder) = predicate_builder {
             let row_group_predicate = build_row_group_predicate(
                 predicate_builder,
+                metrics.clone(),
                 file_reader.metadata().row_groups(),
             );
             file_reader.filter_row_groups(&row_group_predicate);
@@ -757,8 +852,11 @@ mod tests {
             vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
         );
         let row_group_metadata = vec![rgm1, rgm2];
-        let row_group_predicate =
-            build_row_group_predicate(&predicate_builder, &row_group_metadata);
+        let row_group_predicate = build_row_group_predicate(
+            &predicate_builder,
+            ParquetPartitionMetrics::new(),
+            &row_group_metadata,
+        );
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -787,8 +885,11 @@ mod tests {
             vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
         );
         let row_group_metadata = vec![rgm1, rgm2];
-        let row_group_predicate =
-            build_row_group_predicate(&predicate_builder, &row_group_metadata);
+        let row_group_predicate = build_row_group_predicate(
+            &predicate_builder,
+            ParquetPartitionMetrics::new(),
+            &row_group_metadata,
+        );
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -832,8 +933,11 @@ mod tests {
             ],
         );
         let row_group_metadata = vec![rgm1, rgm2];
-        let row_group_predicate =
-            build_row_group_predicate(&predicate_builder, &row_group_metadata);
+        let row_group_predicate = build_row_group_predicate(
+            &predicate_builder,
+            ParquetPartitionMetrics::new(),
+            &row_group_metadata,
+        );
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -847,8 +951,11 @@ mod tests {
         // this bypasses the entire predicate expression and no row groups are 
filtered out
         let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
         let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
-        let row_group_predicate =
-            build_row_group_predicate(&predicate_builder, &row_group_metadata);
+        let row_group_predicate = build_row_group_predicate(
+            &predicate_builder,
+            ParquetPartitionMetrics::new(),
+            &row_group_metadata,
+        );
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
@@ -891,8 +998,11 @@ mod tests {
             ],
         );
         let row_group_metadata = vec![rgm1, rgm2];
-        let row_group_predicate =
-            build_row_group_predicate(&predicate_builder, &row_group_metadata);
+        let row_group_predicate = build_row_group_predicate(
+            &predicate_builder,
+            ParquetPartitionMetrics::new(),
+            &row_group_metadata,
+        );
         let row_group_filter = row_group_metadata
             .iter()
             .enumerate()
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 7ca7cc1..df3aec4 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -251,11 +251,11 @@ pub fn make_timestamps() -> RecordBatch {
     let arr_names = StringArray::from(names);
 
     let schema = Schema::new(vec![
-        Field::new("nanos", arr_nanos.data_type().clone(), false),
-        Field::new("micros", arr_micros.data_type().clone(), false),
-        Field::new("millis", arr_millis.data_type().clone(), false),
-        Field::new("secs", arr_secs.data_type().clone(), false),
-        Field::new("name", arr_names.data_type().clone(), false),
+        Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new("secs", arr_secs.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
     ]);
     let schema = Arc::new(schema);
 
diff --git a/datafusion/tests/parquet_pruning.rs 
b/datafusion/tests/parquet_pruning.rs
new file mode 100644
index 0000000..86b3946
--- /dev/null
+++ b/datafusion/tests/parquet_pruning.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file contains an end to end test of parquet pruning. It writes
+// data into a parquet file and then
+use std::sync::Arc;
+
+use arrow::{
+    array::{
+        Array, StringArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
+        TimestampNanosecondArray, TimestampSecondArray,
+    },
+    datatypes::{Field, Schema},
+    record_batch::RecordBatch,
+    util::pretty::pretty_format_batches,
+};
+use chrono::Duration;
+use datafusion::{
+    physical_plan::{plan_metrics, SQLMetric},
+    prelude::ExecutionContext,
+};
+use hashbrown::HashMap;
+use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
+use tempfile::NamedTempFile;
+
+#[tokio::test]
+async fn prune_timestamps_nanos() {
+    let output = ContextWithParquet::new()
+        .await
+        .query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 
01:01:11Z')")
+        .await;
+    println!("{}", output.description());
+    // TODO This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(1));
+    assert_eq!(output.row_groups_pruned(), Some(0));
+    assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_micros() {
+    let output = ContextWithParquet::new()
+        .await
+        .query(
+            "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 
01:01:11Z')",
+        )
+        .await;
+    println!("{}", output.description());
+    // TODO This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(1));
+    assert_eq!(output.row_groups_pruned(), Some(0));
+    assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_millis() {
+    let output = ContextWithParquet::new()
+        .await
+        .query(
+            "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 
01:01:11Z')",
+        )
+        .await;
+    println!("{}", output.description());
+    // TODO This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(1));
+    assert_eq!(output.row_groups_pruned(), Some(0));
+    assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_seconds() {
+    let output = ContextWithParquet::new()
+        .await
+        .query(
+            "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 
01:01:11Z')",
+        )
+        .await;
+    println!("{}", output.description());
+    // TODO This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(1));
+    assert_eq!(output.row_groups_pruned(), Some(0));
+    assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+// ----------------------
+// Begin test fixture
+// ----------------------
+
+/// Test fixture that has an execution context that has an external
+/// table "t" registered, pointing at a parquet file made with
+/// `make_test_file`
+struct ContextWithParquet {
+    file: NamedTempFile,
+    ctx: ExecutionContext,
+}
+
+/// The output of running one of the test cases
+struct TestOutput {
+    /// The input string
+    sql: String,
+    /// Normalized metrics (filename replaced by a constant)
+    metrics: HashMap<String, SQLMetric>,
+    /// number of rows in results
+    result_rows: usize,
+    /// the contents of the input, as a string
+    pretty_input: String,
+    /// the raw results, as a string
+    pretty_results: String,
+}
+
+impl TestOutput {
+    /// retrieve the value of the named metric, if any
+    fn metric_value(&self, metric_name: &str) -> Option<usize> {
+        self.metrics.get(metric_name).map(|m| m.value())
+    }
+
+    /// The number of times the pruning predicate evaluation errors
+    fn predicate_evaluation_errors(&self) -> Option<usize> {
+        self.metric_value("numPredicateEvaluationErrors for PARQUET_FILE")
+    }
+
+    /// The number of times the pruning predicate evaluation errors
+    fn row_groups_pruned(&self) -> Option<usize> {
+        self.metric_value("numRowGroupsPruned for PARQUET_FILE")
+    }
+
+    fn description(&self) -> String {
+        let metrics = self
+            .metrics
+            .iter()
+            .map(|(name, val)| format!("  {} = {:?}", name, val))
+            .collect::<Vec<_>>();
+
+        format!(
+            "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
+            self.pretty_input,
+            self.sql,
+            self.pretty_results,
+            metrics.join("\n")
+        )
+    }
+}
+
+/// Creates an execution context that has an external table "t"
+/// registered pointing at a parquet file made with `make_test_file`
+impl ContextWithParquet {
+    async fn new() -> Self {
+        let file = make_test_file().await;
+
+        // now, setup a the file as a data source and run a query against it
+        let mut ctx = ExecutionContext::new();
+        let parquet_path = file.path().to_string_lossy();
+        ctx.register_parquet("t", &parquet_path)
+            .expect("registering");
+
+        Self { file, ctx }
+    }
+
+    /// Runs the specified SQL query and returns the number of output
+    /// rows and normalized execution metrics
+    async fn query(&mut self, sql: &str) -> TestOutput {
+        println!("Planning sql {}", sql);
+
+        let input = self
+            .ctx
+            .sql("SELECT * from t")
+            .expect("planning")
+            .collect()
+            .await
+            .expect("getting input");
+        let pretty_input = pretty_format_batches(&input).unwrap();
+
+        let logical_plan = 
self.ctx.sql(sql).expect("planning").to_logical_plan();
+
+        let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing 
plan");
+        let execution_plan = self
+            .ctx
+            .create_physical_plan(&logical_plan)
+            .expect("creating physical plan");
+
+        let results = 
datafusion::physical_plan::collect(execution_plan.clone())
+            .await
+            .expect("Running");
+
+        // replace the path name, which varies test to test,a with some
+        // constant for test comparisons
+        let path = self.file.path();
+        let path_name = path.to_string_lossy();
+        let metrics = plan_metrics(execution_plan)
+            .into_iter()
+            .map(|(name, metric)| {
+                (name.replace(path_name.as_ref(), "PARQUET_FILE"), metric)
+            })
+            .collect();
+
+        let result_rows = results.iter().map(|b| b.num_rows()).sum();
+
+        let pretty_results = pretty_format_batches(&results).unwrap();
+
+        let sql = sql.to_string();
+        TestOutput {
+            sql,
+            metrics,
+            result_rows,
+            pretty_input,
+            pretty_results,
+        }
+    }
+}
+
+/// Create a test parquet file with varioud data types
+async fn make_test_file() -> NamedTempFile {
+    let output_file = tempfile::Builder::new()
+        .prefix("parquet_pruning")
+        .suffix(".parquet")
+        .tempfile()
+        .expect("tempfile creation");
+
+    let props = WriterProperties::builder()
+        .set_max_row_group_size(5)
+        .build();
+
+    let batches = vec![
+        make_batch(Duration::seconds(0)),
+        make_batch(Duration::seconds(10)),
+        make_batch(Duration::minutes(10)),
+        make_batch(Duration::days(10)),
+    ];
+    let schema = batches[0].schema();
+
+    let mut writer = ArrowWriter::try_new(
+        output_file
+            .as_file()
+            .try_clone()
+            .expect("cloning file descriptor"),
+        schema,
+        Some(props),
+    )
+    .unwrap();
+
+    for batch in batches {
+        writer.write(&batch).expect("writing batch");
+    }
+    writer.close().unwrap();
+
+    output_file
+}
+
+/// Return record batch with a few rows of data for all of the supported 
timestamp types
+/// values with the specified offset
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "seconds" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_batch(offset: Duration) -> RecordBatch {
+    let ts_strings = vec![
+        Some("2020-01-01T01:01:01.0000000000001"),
+        Some("2020-01-01T01:02:01.0000000000001"),
+        Some("2020-01-01T02:01:01.0000000000001"),
+        None,
+        Some("2020-01-02T01:01:01.0000000000001"),
+    ];
+
+    let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
+
+    let ts_nanos = ts_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                offset_nanos
+                    + t.parse::<chrono::NaiveDateTime>()
+                        .unwrap()
+                        .timestamp_nanos()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let ts_micros = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+        .collect::<Vec<_>>();
+
+    let ts_millis = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+        .collect::<Vec<_>>();
+
+    let ts_seconds = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+        .collect::<Vec<_>>();
+
+    let names = ts_nanos
+        .iter()
+        .enumerate()
+        .map(|(i, _)| format!("Row {} + {}", i, offset))
+        .collect::<Vec<_>>();
+
+    let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
+    let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
+    let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
+    let arr_seconds = TimestampSecondArray::from_opt_vec(ts_seconds, None);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new("seconds", arr_seconds.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_nanos),
+            Arc::new(arr_micros),
+            Arc::new(arr_millis),
+            Arc::new(arr_seconds),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}

Reply via email to