This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8cbb750 Add End-to-end test for parquet pruning + metrics for
ParquetExec (#657)
8cbb750 is described below
commit 8cbb750faab3189813e95681bc2af53f20c9f0c7
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jul 6 12:41:28 2021 -0400
Add End-to-end test for parquet pruning + metrics for ParquetExec (#657)
* End to end tests for parquet pruning
* remove unused dep
* Make the separation of per-partition and per-exec metrics clearer
* Account for statistics once rather than per row group
* Fix timestamps to use UTC time
---
datafusion/src/physical_optimizer/repartition.rs | 22 +-
datafusion/src/physical_plan/mod.rs | 14 +
datafusion/src/physical_plan/parquet.rs | 156 +++++++++--
datafusion/src/test/mod.rs | 10 +-
datafusion/tests/parquet_pruning.rs | 343 +++++++++++++++++++++++
5 files changed, 508 insertions(+), 37 deletions(-)
diff --git a/datafusion/src/physical_optimizer/repartition.rs
b/datafusion/src/physical_optimizer/repartition.rs
index 011db64..4504c81 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,7 +110,9 @@ mod tests {
use super::*;
use crate::datasource::datasource::Statistics;
- use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
+ use crate::physical_plan::parquet::{
+ ParquetExec, ParquetExecMetrics, ParquetPartition,
+ };
use crate::physical_plan::projection::ProjectionExec;
#[test]
@@ -119,12 +121,13 @@ mod tests {
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
- vec![ParquetPartition {
- filenames: vec!["x".to_string()],
- statistics: Statistics::default(),
- }],
+ vec![ParquetPartition::new(
+ vec!["x".to_string()],
+ Statistics::default(),
+ )],
schema,
None,
+ ParquetExecMetrics::new(),
None,
2048,
None,
@@ -156,12 +159,13 @@ mod tests {
Arc::new(ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
- vec![ParquetPartition {
- filenames: vec!["x".to_string()],
- statistics: Statistics::default(),
- }],
+ vec![ParquetPartition::new(
+ vec!["x".to_string()],
+ Statistics::default(),
+ )],
schema,
None,
+ ParquetExecMetrics::new(),
None,
2048,
None,
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index a940cbe..d89eb11 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -297,6 +297,20 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
Ok(())
}
+/// Recursively gateher all execution metrics from this plan and all of its
input plans
+pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String,
SQLMetric> {
+ fn get_metrics_inner(
+ plan: &dyn ExecutionPlan,
+ mut metrics: HashMap<String, SQLMetric>,
+ ) -> HashMap<String, SQLMetric> {
+ metrics.extend(plan.metrics().into_iter());
+ plan.children().into_iter().fold(metrics, |metrics, child| {
+ get_metrics_inner(child.as_ref(), metrics)
+ })
+ }
+ get_metrics_inner(plan.as_ref(), HashMap::new())
+}
+
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
match plan.output_partitioning().partition_count() {
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index 3d20a9b..f31b921 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -40,6 +40,8 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
+use hashbrown::HashMap;
+use log::debug;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
@@ -59,6 +61,8 @@ use crate::datasource::datasource::{ColumnStatistics,
Statistics};
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
+use super::SQLMetric;
+
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -72,6 +76,8 @@ pub struct ParquetExec {
batch_size: usize,
/// Statistics for the data set (sum of statistics for all partitions)
statistics: Statistics,
+ /// metrics for the overall execution
+ metrics: ParquetExecMetrics,
/// Optional predicate builder
predicate_builder: Option<PruningPredicate>,
/// Optional limit of the number of rows
@@ -93,6 +99,24 @@ pub struct ParquetPartition {
pub filenames: Vec<String>,
/// Statistics for this partition
pub statistics: Statistics,
+ /// Execution metrics
+ metrics: ParquetPartitionMetrics,
+}
+
+/// Stores metrics about the overall parquet execution
+#[derive(Debug, Clone)]
+pub struct ParquetExecMetrics {
+ /// Numer of times the pruning predicate could not be created
+ pub predicate_creation_errors: Arc<SQLMetric>,
+}
+
+/// Stores metrics about the parquet execution for a particular
ParquetPartition
+#[derive(Debug, Clone)]
+struct ParquetPartitionMetrics {
+ /// Numer of times the predicate could not be evaluated
+ pub predicate_evaluation_errors: Arc<SQLMetric>,
+ /// Number of row groups pruned using
+ pub row_groups_pruned: Arc<SQLMetric>,
}
impl ParquetExec {
@@ -140,6 +164,8 @@ impl ParquetExec {
max_concurrency: usize,
limit: Option<usize>,
) -> Result<Self> {
+ debug!("Creating ParquetExec, filenames: {:?}, projection {:?},
predicate: {:?}, limit: {:?}",
+ filenames, projection, predicate, limit);
// build a list of Parquet partitions with statistics and gather all
unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];
@@ -205,10 +231,7 @@ impl ParquetExec {
};
// remove files that are not needed in case of limit
filenames.truncate(total_files);
- partitions.push(ParquetPartition {
- filenames,
- statistics,
- });
+ partitions.push(ParquetPartition::new(filenames, statistics));
if limit_exhausted {
break;
}
@@ -225,14 +248,27 @@ impl ParquetExec {
)));
}
let schema = Arc::new(schemas.pop().unwrap());
+ let metrics = ParquetExecMetrics::new();
+
let predicate_builder = predicate.and_then(|predicate_expr| {
- PruningPredicate::try_new(&predicate_expr, schema.clone()).ok()
+ match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
+ Ok(predicate_builder) => Some(predicate_builder),
+ Err(e) => {
+ debug!(
+ "Could not create pruning predicate for {:?}: {}",
+ predicate_expr, e
+ );
+ metrics.predicate_creation_errors.add(1);
+ None
+ }
+ }
});
Ok(Self::new(
partitions,
schema,
projection,
+ metrics,
predicate_builder,
batch_size,
limit,
@@ -244,6 +280,7 @@ impl ParquetExec {
partitions: Vec<ParquetPartition>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
+ metrics: ParquetExecMetrics,
predicate_builder: Option<PruningPredicate>,
batch_size: usize,
limit: Option<usize>,
@@ -307,6 +344,7 @@ impl ParquetExec {
partitions,
schema: Arc::new(projected_schema),
projection,
+ metrics,
predicate_builder,
batch_size,
statistics,
@@ -341,6 +379,7 @@ impl ParquetPartition {
Self {
filenames,
statistics,
+ metrics: ParquetPartitionMetrics::new(),
}
}
@@ -355,6 +394,25 @@ impl ParquetPartition {
}
}
+impl ParquetExecMetrics {
+ /// Create new metrics
+ pub fn new() -> Self {
+ Self {
+ predicate_creation_errors: SQLMetric::counter(),
+ }
+ }
+}
+
+impl ParquetPartitionMetrics {
+ /// Create new metrics
+ pub fn new() -> Self {
+ Self {
+ predicate_evaluation_errors: SQLMetric::counter(),
+ row_groups_pruned: SQLMetric::counter(),
+ }
+ }
+}
+
#[async_trait]
impl ExecutionPlan for ParquetExec {
/// Return a reference to Any that can be used for downcasting
@@ -398,7 +456,9 @@ impl ExecutionPlan for ParquetExec {
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);
- let filenames = self.partitions[partition].filenames.clone();
+ let partition = &self.partitions[partition];
+ let filenames = partition.filenames.clone();
+ let metrics = partition.metrics.clone();
let projection = self.projection.clone();
let predicate_builder = self.predicate_builder.clone();
let batch_size = self.batch_size;
@@ -407,6 +467,7 @@ impl ExecutionPlan for ParquetExec {
task::spawn_blocking(move || {
if let Err(e) = read_files(
&filenames,
+ metrics,
&projection,
&predicate_builder,
batch_size,
@@ -448,6 +509,31 @@ impl ExecutionPlan for ParquetExec {
}
}
}
+
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ self.partitions
+ .iter()
+ .flat_map(|p| {
+ [
+ (
+ format!(
+ "numPredicateEvaluationErrors for {}",
+ p.filenames.join(",")
+ ),
+ p.metrics.predicate_evaluation_errors.as_ref().clone(),
+ ),
+ (
+ format!("numRowGroupsPruned for {}",
p.filenames.join(",")),
+ p.metrics.row_groups_pruned.as_ref().clone(),
+ ),
+ ]
+ })
+ .chain(std::iter::once((
+ "numPredicateCreationErrors".to_string(),
+ self.metrics.predicate_creation_errors.as_ref().clone(),
+ )))
+ .collect()
+ }
}
fn send_result(
@@ -547,6 +633,7 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
+ metrics: ParquetPartitionMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = predicate_builder.schema().as_ref();
@@ -555,21 +642,28 @@ fn build_row_group_predicate(
row_group_metadata,
parquet_schema,
};
-
let predicate_values = predicate_builder.prune(&pruning_stats);
- let predicate_values = match predicate_values {
- Ok(values) => values,
+ match predicate_values {
+ Ok(values) => {
+ // NB: false means don't scan row group
+ let num_pruned = values.iter().filter(|&v| !v).count();
+ metrics.row_groups_pruned.add(num_pruned);
+ Box::new(move |_, i| values[i])
+ }
// stats filter array could not be built
// return a closure which will not filter out any row groups
- _ => return Box::new(|_r, _i| true),
- };
-
- Box::new(move |_, i| predicate_values[i])
+ Err(e) => {
+ debug!("Error evaluating row group predicate values {}", e);
+ metrics.predicate_evaluation_errors.add(1);
+ Box::new(|_r, _i| true)
+ }
+ }
}
fn read_files(
filenames: &[String],
+ metrics: ParquetPartitionMetrics,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
batch_size: usize,
@@ -583,6 +677,7 @@ fn read_files(
if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = build_row_group_predicate(
predicate_builder,
+ metrics.clone(),
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
@@ -757,8 +852,11 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
- let row_group_predicate =
- build_row_group_predicate(&predicate_builder, &row_group_metadata);
+ let row_group_predicate = build_row_group_predicate(
+ &predicate_builder,
+ ParquetPartitionMetrics::new(),
+ &row_group_metadata,
+ );
let row_group_filter = row_group_metadata
.iter()
.enumerate()
@@ -787,8 +885,11 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
- let row_group_predicate =
- build_row_group_predicate(&predicate_builder, &row_group_metadata);
+ let row_group_predicate = build_row_group_predicate(
+ &predicate_builder,
+ ParquetPartitionMetrics::new(),
+ &row_group_metadata,
+ );
let row_group_filter = row_group_metadata
.iter()
.enumerate()
@@ -832,8 +933,11 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
- let row_group_predicate =
- build_row_group_predicate(&predicate_builder, &row_group_metadata);
+ let row_group_predicate = build_row_group_predicate(
+ &predicate_builder,
+ ParquetPartitionMetrics::new(),
+ &row_group_metadata,
+ );
let row_group_filter = row_group_metadata
.iter()
.enumerate()
@@ -847,8 +951,11 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are
filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
- let row_group_predicate =
- build_row_group_predicate(&predicate_builder, &row_group_metadata);
+ let row_group_predicate = build_row_group_predicate(
+ &predicate_builder,
+ ParquetPartitionMetrics::new(),
+ &row_group_metadata,
+ );
let row_group_filter = row_group_metadata
.iter()
.enumerate()
@@ -891,8 +998,11 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
- let row_group_predicate =
- build_row_group_predicate(&predicate_builder, &row_group_metadata);
+ let row_group_predicate = build_row_group_predicate(
+ &predicate_builder,
+ ParquetPartitionMetrics::new(),
+ &row_group_metadata,
+ );
let row_group_filter = row_group_metadata
.iter()
.enumerate()
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 7ca7cc1..df3aec4 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -251,11 +251,11 @@ pub fn make_timestamps() -> RecordBatch {
let arr_names = StringArray::from(names);
let schema = Schema::new(vec![
- Field::new("nanos", arr_nanos.data_type().clone(), false),
- Field::new("micros", arr_micros.data_type().clone(), false),
- Field::new("millis", arr_millis.data_type().clone(), false),
- Field::new("secs", arr_secs.data_type().clone(), false),
- Field::new("name", arr_names.data_type().clone(), false),
+ Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new("secs", arr_secs.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
diff --git a/datafusion/tests/parquet_pruning.rs
b/datafusion/tests/parquet_pruning.rs
new file mode 100644
index 0000000..86b3946
--- /dev/null
+++ b/datafusion/tests/parquet_pruning.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file contains an end to end test of parquet pruning. It writes
+// data into a parquet file and then
+use std::sync::Arc;
+
+use arrow::{
+ array::{
+ Array, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
+ TimestampNanosecondArray, TimestampSecondArray,
+ },
+ datatypes::{Field, Schema},
+ record_batch::RecordBatch,
+ util::pretty::pretty_format_batches,
+};
+use chrono::Duration;
+use datafusion::{
+ physical_plan::{plan_metrics, SQLMetric},
+ prelude::ExecutionContext,
+};
+use hashbrown::HashMap;
+use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
+use tempfile::NamedTempFile;
+
+#[tokio::test]
+async fn prune_timestamps_nanos() {
+ let output = ContextWithParquet::new()
+ .await
+ .query("SELECT * FROM t where nanos < to_timestamp('2020-01-02
01:01:11Z')")
+ .await;
+ println!("{}", output.description());
+ // TODO This should prune one metrics without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(1));
+ assert_eq!(output.row_groups_pruned(), Some(0));
+ assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_micros() {
+ let output = ContextWithParquet::new()
+ .await
+ .query(
+ "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02
01:01:11Z')",
+ )
+ .await;
+ println!("{}", output.description());
+ // TODO This should prune one metrics without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(1));
+ assert_eq!(output.row_groups_pruned(), Some(0));
+ assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_millis() {
+ let output = ContextWithParquet::new()
+ .await
+ .query(
+ "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02
01:01:11Z')",
+ )
+ .await;
+ println!("{}", output.description());
+ // TODO This should prune one metrics without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(1));
+ assert_eq!(output.row_groups_pruned(), Some(0));
+ assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_timestamps_seconds() {
+ let output = ContextWithParquet::new()
+ .await
+ .query(
+ "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02
01:01:11Z')",
+ )
+ .await;
+ println!("{}", output.description());
+ // TODO This should prune one metrics without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(1));
+ assert_eq!(output.row_groups_pruned(), Some(0));
+ assert_eq!(output.result_rows, 10, "{}", output.description());
+}
+
+// ----------------------
+// Begin test fixture
+// ----------------------
+
+/// Test fixture that has an execution context that has an external
+/// table "t" registered, pointing at a parquet file made with
+/// `make_test_file`
+struct ContextWithParquet {
+ file: NamedTempFile,
+ ctx: ExecutionContext,
+}
+
+/// The output of running one of the test cases
+struct TestOutput {
+ /// The input string
+ sql: String,
+ /// Normalized metrics (filename replaced by a constant)
+ metrics: HashMap<String, SQLMetric>,
+ /// number of rows in results
+ result_rows: usize,
+ /// the contents of the input, as a string
+ pretty_input: String,
+ /// the raw results, as a string
+ pretty_results: String,
+}
+
+impl TestOutput {
+ /// retrieve the value of the named metric, if any
+ fn metric_value(&self, metric_name: &str) -> Option<usize> {
+ self.metrics.get(metric_name).map(|m| m.value())
+ }
+
+ /// The number of times the pruning predicate evaluation errors
+ fn predicate_evaluation_errors(&self) -> Option<usize> {
+ self.metric_value("numPredicateEvaluationErrors for PARQUET_FILE")
+ }
+
+ /// The number of times the pruning predicate evaluation errors
+ fn row_groups_pruned(&self) -> Option<usize> {
+ self.metric_value("numRowGroupsPruned for PARQUET_FILE")
+ }
+
+ fn description(&self) -> String {
+ let metrics = self
+ .metrics
+ .iter()
+ .map(|(name, val)| format!(" {} = {:?}", name, val))
+ .collect::<Vec<_>>();
+
+ format!(
+ "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
+ self.pretty_input,
+ self.sql,
+ self.pretty_results,
+ metrics.join("\n")
+ )
+ }
+}
+
+/// Creates an execution context that has an external table "t"
+/// registered pointing at a parquet file made with `make_test_file`
+impl ContextWithParquet {
+ async fn new() -> Self {
+ let file = make_test_file().await;
+
+ // now, setup a the file as a data source and run a query against it
+ let mut ctx = ExecutionContext::new();
+ let parquet_path = file.path().to_string_lossy();
+ ctx.register_parquet("t", &parquet_path)
+ .expect("registering");
+
+ Self { file, ctx }
+ }
+
+ /// Runs the specified SQL query and returns the number of output
+ /// rows and normalized execution metrics
+ async fn query(&mut self, sql: &str) -> TestOutput {
+ println!("Planning sql {}", sql);
+
+ let input = self
+ .ctx
+ .sql("SELECT * from t")
+ .expect("planning")
+ .collect()
+ .await
+ .expect("getting input");
+ let pretty_input = pretty_format_batches(&input).unwrap();
+
+ let logical_plan =
self.ctx.sql(sql).expect("planning").to_logical_plan();
+
+ let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing
plan");
+ let execution_plan = self
+ .ctx
+ .create_physical_plan(&logical_plan)
+ .expect("creating physical plan");
+
+ let results =
datafusion::physical_plan::collect(execution_plan.clone())
+ .await
+ .expect("Running");
+
+ // replace the path name, which varies test to test,a with some
+ // constant for test comparisons
+ let path = self.file.path();
+ let path_name = path.to_string_lossy();
+ let metrics = plan_metrics(execution_plan)
+ .into_iter()
+ .map(|(name, metric)| {
+ (name.replace(path_name.as_ref(), "PARQUET_FILE"), metric)
+ })
+ .collect();
+
+ let result_rows = results.iter().map(|b| b.num_rows()).sum();
+
+ let pretty_results = pretty_format_batches(&results).unwrap();
+
+ let sql = sql.to_string();
+ TestOutput {
+ sql,
+ metrics,
+ result_rows,
+ pretty_input,
+ pretty_results,
+ }
+ }
+}
+
+/// Create a test parquet file with varioud data types
+async fn make_test_file() -> NamedTempFile {
+ let output_file = tempfile::Builder::new()
+ .prefix("parquet_pruning")
+ .suffix(".parquet")
+ .tempfile()
+ .expect("tempfile creation");
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(5)
+ .build();
+
+ let batches = vec![
+ make_batch(Duration::seconds(0)),
+ make_batch(Duration::seconds(10)),
+ make_batch(Duration::minutes(10)),
+ make_batch(Duration::days(10)),
+ ];
+ let schema = batches[0].schema();
+
+ let mut writer = ArrowWriter::try_new(
+ output_file
+ .as_file()
+ .try_clone()
+ .expect("cloning file descriptor"),
+ schema,
+ Some(props),
+ )
+ .unwrap();
+
+ for batch in batches {
+ writer.write(&batch).expect("writing batch");
+ }
+ writer.close().unwrap();
+
+ output_file
+}
+
+/// Return record batch with a few rows of data for all of the supported
timestamp types
+/// values with the specified offset
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "seconds" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_batch(offset: Duration) -> RecordBatch {
+ let ts_strings = vec![
+ Some("2020-01-01T01:01:01.0000000000001"),
+ Some("2020-01-01T01:02:01.0000000000001"),
+ Some("2020-01-01T02:01:01.0000000000001"),
+ None,
+ Some("2020-01-02T01:01:01.0000000000001"),
+ ];
+
+ let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
+
+ let ts_nanos = ts_strings
+ .into_iter()
+ .map(|t| {
+ t.map(|t| {
+ offset_nanos
+ + t.parse::<chrono::NaiveDateTime>()
+ .unwrap()
+ .timestamp_nanos()
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let ts_micros = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+ .collect::<Vec<_>>();
+
+ let ts_millis = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+ .collect::<Vec<_>>();
+
+ let ts_seconds = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+ .collect::<Vec<_>>();
+
+ let names = ts_nanos
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("Row {} + {}", i, offset))
+ .collect::<Vec<_>>();
+
+ let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
+ let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
+ let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
+ let arr_seconds = TimestampSecondArray::from_opt_vec(ts_seconds, None);
+
+ let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let arr_names = StringArray::from(names);
+
+ let schema = Schema::new(vec![
+ Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new("seconds", arr_seconds.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
+ ]);
+ let schema = Arc::new(schema);
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_nanos),
+ Arc::new(arr_micros),
+ Arc::new(arr_millis),
+ Arc::new(arr_seconds),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap()
+}