This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/main by this push:
new 0f13aaf feat: make parquet row groups size configurable (#17)
0f13aaf is described below
commit 0f13aaf25038354c7903f757d029a8853eac7140
Author: Pranav Toggi <[email protected]>
AuthorDate: Fri Sep 19 11:26:23 2025 -0700
feat: make parquet row groups size configurable (#17)
* temp: rename to tpchgen
* feat: make parquet row groups size configurable
* fix plan tests
* fix fmt and cli-integration tests
* rename back to spatialbench
* fix readmes
* fix typo
* fix cli integration test
* Update README.md
* fix doc comment
* Update spatialbench-cli/tests/cli_integration.rs
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Jia Yu <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
README.md | 4 +-
TESTING.md | 73 -------------
spatialbench-arrow/README.md | 2 +-
spatialbench-cli/src/main.rs | 47 +++++++--
spatialbench-cli/src/plan.rs | 100 +++++++++++++++---
spatialbench-cli/tests/cli_integration.rs | 163 ++++++++++++++++++++++++++++++
6 files changed, 290 insertions(+), 99 deletions(-)
diff --git a/README.md b/README.md
index 7afbfb8..4adc263 100644
--- a/README.md
+++ b/README.md
@@ -89,10 +89,10 @@ cargo install --path ./spatialbench-cli
- The core generator logic lives in the spatialbench crate.
- Geometry-aware logic is in spatialbench-arrow and integrated via Arrow-based
schemas.
-- The spatial extension modules like the Spider geometry generator reside in
the
[spatial](https://github.com/wherobots/sedona-spatialbench/blob/main/spatialbench/src/spatial)
directory.
+- The spatial extension modules like the Spider geometry generator reside in
the [spatial](./spatialbench/src/spatial) directory.
- The generator supports output formats like .tbl and Apache Parquet via the
Arrow writer.
-For contribution or debugging, refer to the
[ARCHITECTURE.md](https://github.com/wherobots/sedona-spatialbench/blob/main/ARCHITECTURE.md)
guide.
+For contribution or debugging, refer to the
[ARCHITECTURE.md](./ARCHITECTURE.md) guide.
## Usage
diff --git a/TESTING.md b/TESTING.md
deleted file mode 100644
index 177b285..0000000
--- a/TESTING.md
+++ /dev/null
@@ -1,73 +0,0 @@
-
-This crate has extensive tests to ensure correctness. We compare the output
-of this crate with the original `dbgen` implementation as part of every
checkin.
-See [conformance.sh](scripts/conformance.sh) for more details.
-
-`tpchgen-cli` generates **exactly** the same bytes as the original `dbgen`
-program. You can verify this for yourself by using `shasum`, for example:
-
-```sh
-$ shasum /tmp/sf10/lineitem.tbl tpch-dbgen/lineitem.tbl
-c3f5d0218b6623125887d7021922d1e91da11613 /tmp/sf10/lineitem.tbl
-c3f5d0218b6623125887d7021922d1e91da11613 tpch-dbgen/lineitem.tbl
-````
-
-Below we provide the SHA256 checksums for each table generated by `dbgen`
-program for several scale factors as it takes a long time to generate the data.
-These checksums are generated using the code in the [`tpch-dbgen`] repo and the
-following commands on MacOS
-
-```shell
-git clone https://github.com/electrum/tpch-dbgen
-cd tpch-dbgen
-make
-./dbgen -s <scale_factor>
-cd ..
-shasum tpch-dbgen/*.tbl
-```
-
-You can compare these yourself with the output of `tpchgen-cli` to verify
correctness:
-
-[tpch-dbgen]: https://github.com/electrum/tpch-dbgen
-
-## Scale Factor 1
-
-```sh
-$ shasum tpch-dbgen/*.tbl
-bee45e9c240e87d63786324696b1babad18a5e0b tpch-dbgen/customer.tbl
-4802b21c9975d965aa11893214a879df0d8d9e01 tpch-dbgen/lineitem.tbl
-f361dffd3d927f5aa64e71cff91458fb5ea1315f tpch-dbgen/nation.tbl
-00d790a08a116feec992cea14272a4f1e5c55925 tpch-dbgen/orders.tbl
-06615f7433806c06162af49c7bc27166c64a31d6 tpch-dbgen/part.tbl
-db0fcb935904765a9085505b5feb5260752f8bf3 tpch-dbgen/partsupp.tbl
-ac61de9604337e791f1bdbcef8f0cdcc21b01514 tpch-dbgen/region.tbl
-baad047476a2720d99b707b6f7a7c9e50c170d5a tpch-dbgen/supplier.tbl
-```
-
-## Scale Factor 10
-
-```sh
-$ shasum tpch-dbgen/*.tbl
-b717482bde38c8312cf232e7ca73aab62f5e1eca tpch-dbgen/customer.tbl
-c3f5d0218b6623125887d7021922d1e91da11613 tpch-dbgen/lineitem.tbl
-f361dffd3d927f5aa64e71cff91458fb5ea1315f tpch-dbgen/nation.tbl
-dddffc12e235da9cd8d17584dc1eab237654cb0f tpch-dbgen/orders.tbl
-efb2a169b6ce80d8ed3989147e8d70e7f2a38d6c tpch-dbgen/part.tbl
-eae140257dc91ba3b4a929c32ebe3d08f3605618 tpch-dbgen/partsupp.tbl
-ac61de9604337e791f1bdbcef8f0cdcc21b01514 tpch-dbgen/region.tbl
-42a76ba965916326e52adca1725ed9ee18b8e61b tpch-dbgen/supplier.tbl
-```
-
-## Scale Factor 100
-
-```sh
-$ shasum tpch-dbgen/*.tbl
-18f5a1784d3adbd4662c35ed1d98897a9773a0dc tpch-dbgen/customer.tbl
-d5a3d8a3ccf7bb20d4ff5f01589b5004504907ec tpch-dbgen/lineitem.tbl
-f361dffd3d927f5aa64e71cff91458fb5ea1315f tpch-dbgen/nation.tbl
-dbfe1ff7481a8e1c2deeba00a0b36c3efb093d0b tpch-dbgen/orders.tbl
-f6eb11ed8a2b4d7d70e30b334fc4fc5a28e03ea4 tpch-dbgen/part.tbl
-0d9070467528371790f43e1a6463358ddfcd5f62 tpch-dbgen/partsupp.tbl
-ac61de9604337e791f1bdbcef8f0cdcc21b01514 tpch-dbgen/region.tbl
-48bc62481b58ff96e5e50a70b3892f4d95f7372f tpch-dbgen/supplier.tbl
-```
diff --git a/spatialbench-arrow/README.md b/spatialbench-arrow/README.md
index a473e44..59a25ad 100644
--- a/spatialbench-arrow/README.md
+++ b/spatialbench-arrow/README.md
@@ -20,4 +20,4 @@ This crate ensures correct results using two methods.
Please see [CONTRIBUTING.md] for more information on how to contribute to this
project.
-[CONTRIBUTING.md]:
https://github.com/wherobots/sedona-spatialbench/blob/main/CONTRIBUTING.md
\ No newline at end of file
+[CONTRIBUTING.md]:
https://github.com/apache/sedona-spatialbench/blob/main/CONTRIBUTING.md
\ No newline at end of file
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 1496ffa..f94e4b9 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -19,6 +19,7 @@
//! --part <N> Which part to generate (1-based, default:
1)
//! -n, --num-threads <N> Number of threads to use (default: number
of CPUs)
//! -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY,
ZSTD(1), UNCOMPRESSED (default: SNAPPY)
+//! --parquet-row-group-size <N> Target size in bytes per row group in
Parquet files (default: 134,217,728)
//! -v, --verbose Verbose output
//! --stdout Write output to stdout instead of files
//!```
@@ -50,7 +51,7 @@ mod tbl;
use crate::csv::*;
use crate::generate::{generate_in_chunks, Sink, Source};
use crate::parquet::*;
-use crate::plan::GenerationPlan;
+use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
use crate::spatial_config_file::parse_yaml;
use crate::statistics::WriteStatistics;
use crate::tbl::*;
@@ -81,7 +82,7 @@ use std::time::Instant;
#[command(version)]
#[command(about = "TPC-H Data Generator", long_about = None)]
struct Cli {
- /// Scale factor to address (default: 1)
+ /// Scale factor to create
#[arg(short, long, default_value_t = 1.)]
scale_factor: f64,
@@ -97,17 +98,17 @@ struct Cli {
#[arg(long = "config")]
config: Option<PathBuf>,
- /// Number of partitions to generate (manual parallel generation)
+ /// Number of part(itions) to generate (manual parallel generation)
#[arg(short, long)]
parts: Option<i32>,
- /// Which partition to generate (1-based)
+ /// Which part(ition) to generate (1-based)
///
/// If not specified, generates all parts
#[arg(long)]
part: Option<i32>,
- /// Output format: tbl, csv, parquet (default: tbl)
+ /// Output format: tbl, csv, parquet
#[arg(short, long, default_value = "tbl")]
format: OutputFormat,
@@ -115,7 +116,7 @@ struct Cli {
#[arg(short, long, default_value_t = num_cpus::get())]
num_threads: usize,
- /// Parquet block compression format. Default is SNAPPY
+ /// Parquet block compression format.
///
/// Supported values: UNCOMPRESSED, ZSTD(N), SNAPPY, GZIP, LZO, BROTLI, LZ4
///
@@ -131,13 +132,28 @@ struct Cli {
#[arg(short = 'c', long, default_value = "SNAPPY")]
parquet_compression: Compression,
- /// Verbose output (default: false)
+ /// Verbose output
#[arg(short, long, default_value_t = false)]
verbose: bool,
/// Write the output to stdout instead of a file.
#[arg(long, default_value_t = false)]
stdout: bool,
+
+ /// Target size in row group bytes in Parquet files
+ ///
+ /// Row groups are the typical unit of parallel processing and compression
+ /// in Parquet. With many query engines, smaller row groups enable better
+ /// parallelism and lower peak memory use but may reduce compression
+ /// efficiency.
+ ///
+ /// Note: parquet files are limited to 32k row groups, so at high scale
+ /// factors, the row group size may be increased to keep the number of row
+ /// groups under this limit.
+ ///
+ /// Typical values range from 10MB to 100MB.
+ #[arg(long, default_value_t = DEFAULT_PARQUET_ROW_GROUP_BYTES)]
+ parquet_row_group_bytes: i64,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
@@ -261,6 +277,7 @@ macro_rules! define_generate {
self.scale_factor,
self.part,
self.parts,
+ self.parquet_row_group_bytes,
)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let scale_factor = self.scale_factor;
@@ -282,6 +299,7 @@ macro_rules! define_generate {
}
impl Cli {
+ /// Main function to run the generation
async fn main(self) -> io::Result<()> {
if self.verbose {
// explicitly set logging to info / stdout
@@ -360,6 +378,20 @@ impl Cli {
let elapsed = start.elapsed();
info!("Created static distributions and text pools in {elapsed:?}");
+ // Warn if parquet specific options are set but not generating parquet
+ if self.format != OutputFormat::Parquet {
+ if self.parquet_compression != Compression::SNAPPY {
+ eprintln!(
+ "Warning: Parquet compression option set but not
generating Parquet files"
+ );
+ }
+ if self.parquet_row_group_bytes != DEFAULT_PARQUET_ROW_GROUP_BYTES
{
+ eprintln!(
+ "Warning: Parquet row group size option set but not
generating Parquet files"
+ );
+ }
+ }
+
// Generate each table
for table in tables {
match table {
@@ -375,6 +407,7 @@ impl Cli {
info!("Generation complete!");
Ok(())
}
+
define_generate!(
generate_vehicle,
Table::Vehicle,
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 89df572..926f379 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -1,4 +1,4 @@
-//! [`GenerationPlan`] that describes how to generate a TPC-H dataset.
+//! [`GenerationPlan`] that describes how to generate a Spatial Bench dataset.
use crate::{OutputFormat, Table};
use log::debug;
@@ -58,18 +58,22 @@ pub struct GenerationPlan {
part_list: RangeInclusive<i32>,
}
+pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024;
+
impl GenerationPlan {
/// Returns a GenerationPlan number of parts to generate
///
/// # Arguments
/// * `cli_part`: optional part number to generate (1-based), `--part` CLI
argument
/// * `cli_part_count`: optional total number of parts, `--parts` CLI
argument
+ /// * `parquet_row_group_size`: optional parquet row group size,
`--parquet-row-group-size` CLI argument
pub fn try_new(
table: &Table,
format: OutputFormat,
scale_factor: f64,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
+ parquet_row_group_bytes: i64,
) -> Result<Self, String> {
// If a single part is specified, split it into chunks to enable
parallel generation.
match (cli_part, cli_part_count) {
@@ -83,10 +87,17 @@ impl GenerationPlan {
"The --part_count option requires the --part option to be
set",
))
}
- (Some(part), Some(part_count)) => {
- Self::try_new_with_parts(table, format, scale_factor, part,
part_count)
+ (Some(part), Some(part_count)) => Self::try_new_with_parts(
+ table,
+ format,
+ scale_factor,
+ part,
+ part_count,
+ parquet_row_group_bytes,
+ ),
+ (None, None) => {
+ Self::try_new_without_parts(table, format, scale_factor,
parquet_row_group_bytes)
}
- (None, None) => Self::try_new_without_parts(table, format,
scale_factor),
}
}
@@ -99,6 +110,7 @@ impl GenerationPlan {
scale_factor: f64,
cli_part: i32,
cli_part_count: i32,
+ parquet_row_group_bytes: i64,
) -> Result<Self, String> {
if cli_part < 1 {
return Err(format!(
@@ -126,7 +138,7 @@ impl GenerationPlan {
// scale down the row count by the number of partitions being generated
// so that the output is consistent with the original part count
- let num_chunks = OutputSize::new(table, scale_factor, format)
+ let num_chunks = OutputSize::new(table, scale_factor, format,
parquet_row_group_bytes)
.with_scaled_row_count(cli_part_count)
.part_count();
@@ -161,8 +173,9 @@ impl GenerationPlan {
table: &Table,
format: OutputFormat,
scale_factor: f64,
+ parquet_row_group_bytes: i64,
) -> Result<Self, String> {
- let output_size = OutputSize::new(table, scale_factor, format);
+ let output_size = OutputSize::new(table, scale_factor, format,
parquet_row_group_bytes);
let num_parts = output_size.part_count();
Ok(Self {
@@ -205,7 +218,12 @@ struct OutputSize {
}
impl OutputSize {
- pub fn new(table: &Table, scale_factor: f64, format: OutputFormat) -> Self
{
+ pub fn new(
+ table: &Table,
+ scale_factor: f64,
+ format: OutputFormat,
+ parquet_row_group_bytes: i64,
+ ) -> Self {
let row_count = Self::row_count_for_table(table, scale_factor);
// The average row size in bytes for each table in the TPC-H schema
@@ -242,15 +260,12 @@ impl OutputSize {
};
let target_chunk_size_bytes = match format {
- // for tbl/csv target chunks of about the same as the output buffer
- // size, 16MB. Use 15MB to ensure we don't exceed the buffer size.
+ // for tbl/csv target chunks, this value does not affect the output
+ // file. Use 15MB, slightly smaller than the 16MB buffer size, to
+ // ensure small overages don't exceed the buffer size and require a
+ // reallocation
OutputFormat::Tbl | OutputFormat::Csv => 15 * 1024 * 1024,
- // target 7MB row groups
- // todo make this configurable
- // https://github.com/clflushopt/tpchgen-rs/issues/146
- // Use 128MB for parquet files, which is the default
- // row group size used by spark
- OutputFormat::Parquet => 128 * 1024 * 1024,
+ OutputFormat::Parquet => parquet_row_group_bytes,
};
// parquet files can have at most 32767 row groups so cap the number
of parts at that number
@@ -668,6 +683,49 @@ mod tests {
"Expected parts {expected_parts:?} should not exceed 32k row
groups",
);
}
+
+ mod parquet_row_group_size {
+ use super::*;
+ #[test]
+ fn parquet_sf1_lineitem_default_row_group() {
+ Test::new()
+ .with_table(Table::Trip)
+ .with_format(OutputFormat::Parquet)
+ .with_scale_factor(10.0)
+ .assert(31, 1..=31);
+ }
+
+ #[test]
+ fn parquet_sf1_lineitem_small_row_group() {
+ Test::new()
+ .with_table(Table::Trip)
+ .with_format(OutputFormat::Parquet)
+ .with_scale_factor(10.0)
+ .with_parquet_row_group_bytes(1024 * 1024) // 1MB row
groups
+ .assert(3949, 1..=3949);
+ }
+
+ #[test]
+ fn parquet_sf1_lineitem_large_row_group() {
+ Test::new()
+ .with_table(Table::Trip)
+ .with_format(OutputFormat::Parquet)
+ .with_scale_factor(10.0)
+ .with_parquet_row_group_bytes(20 * 1024 * 1024) // 20MB
row groups
+ .assert(198, 1..=198);
+ }
+
+ #[test]
+ fn parquet_sf1_lineitem_small_row_group_max_groups() {
+ Test::new()
+ .with_table(Table::Trip)
+ .with_format(OutputFormat::Parquet)
+ .with_scale_factor(100000.0)
+ .with_parquet_row_group_bytes(1024 * 1024) // 1MB row
groups
+ // parquet is limited to no more than 32k actual row
groups in a parquet file
+ .assert(32767, 1..=32767);
+ }
+ }
}
/// Test fixture for [`GenerationPlan`].
@@ -678,6 +736,7 @@ mod tests {
scale_factor: f64,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
+ parquet_row_group_bytes: i64,
}
impl Test {
@@ -694,6 +753,7 @@ mod tests {
self.scale_factor,
self.cli_part,
self.cli_part_count,
+ self.parquet_row_group_bytes,
)
.unwrap();
assert_eq!(plan.part_count, expected_part_count);
@@ -708,6 +768,7 @@ mod tests {
self.scale_factor,
self.cli_part,
self.cli_part_count,
+ self.parquet_row_group_bytes,
)
.unwrap_err();
assert_eq!(actual_error, expected_error);
@@ -737,11 +798,17 @@ mod tests {
self
}
- /// Set CLI partitition count
+ /// Set CLI partition count
fn with_cli_part_count(mut self, cli_part_count: i32) -> Self {
self.cli_part_count = Some(cli_part_count);
self
}
+
+ /// Set parquet row group size
+ fn with_parquet_row_group_bytes(mut self, parquet_row_group_bytes:
i64) -> Self {
+ self.parquet_row_group_bytes = parquet_row_group_bytes;
+ self
+ }
}
impl Default for Test {
@@ -752,6 +819,7 @@ mod tests {
scale_factor: 1.0,
cli_part: None,
cli_part_count: None,
+ parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
}
}
}
diff --git a/spatialbench-cli/tests/cli_integration.rs
b/spatialbench-cli/tests/cli_integration.rs
index 6b3bbb9..1786278 100644
--- a/spatialbench-cli/tests/cli_integration.rs
+++ b/spatialbench-cli/tests/cli_integration.rs
@@ -1,5 +1,6 @@
use assert_cmd::Command;
use parquet::arrow::arrow_reader::{ArrowReaderOptions,
ParquetRecordBatchReaderBuilder};
+use parquet::file::metadata::ParquetMetaDataReader;
use spatialbench::generators::TripGenerator;
use spatialbench_arrow::{RecordBatchIterator, TripArrow};
use std::fs;
@@ -184,6 +185,100 @@ async fn test_write_parquet_trips() {
}
}
+#[tokio::test]
+async fn test_write_parquet_row_group_size_default() {
+ // Run the CLI command to generate parquet data with default settings
+ let output_dir = tempdir().unwrap();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("parquet")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("trip,driver,vehicle,customer,building")
+ .arg("--output-dir")
+ .arg(output_dir.path())
+ .assert()
+ .success();
+
+ expect_row_group_sizes(
+ output_dir.path(),
+ vec![
+ RowGroups {
+ table: "customer",
+ row_group_bytes: vec![2600113],
+ },
+ RowGroups {
+ table: "trip",
+ row_group_bytes: vec![123519959, 123486809, 123476361,
123492237],
+ },
+ RowGroups {
+ table: "driver",
+ row_group_bytes: vec![41594],
+ },
+ RowGroups {
+ table: "vehicle",
+ row_group_bytes: vec![5393],
+ },
+ RowGroups {
+ table: "building",
+ row_group_bytes: vec![2492865],
+ },
+ ],
+ );
+}
+
+#[tokio::test]
+async fn test_write_parquet_row_group_size_20mb() {
+ // Run the CLI command to generate parquet data with larger row group size
+ let output_dir = tempdir().unwrap();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("parquet")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("trip,driver,vehicle,customer,building")
+ .arg("--output-dir")
+ .arg(output_dir.path())
+ .arg("--parquet-row-group-bytes")
+ .arg("20000000") // 20 MB
+ .assert()
+ .success();
+
+ expect_row_group_sizes(
+ output_dir.path(),
+ vec![
+ RowGroups {
+ table: "customer",
+ row_group_bytes: vec![2600113],
+ },
+ RowGroups {
+ table: "trip",
+ row_group_bytes: vec![
+ 24361422, 24361685, 24350928, 24348682, 24353605,
24335813, 24358941, 24343011,
+ 24345967, 24361312, 24337627, 24345972, 24348724,
24361400, 24361528, 24346264,
+ 24351137, 24338412, 24348304, 24361680, 24351433,
+ ],
+ },
+ RowGroups {
+ table: "driver",
+ row_group_bytes: vec![41594],
+ },
+ RowGroups {
+ table: "vehicle",
+ row_group_bytes: vec![5393],
+ },
+ RowGroups {
+ table: "building",
+ row_group_bytes: vec![2492865],
+ },
+ ],
+ );
+}
+
#[test]
fn test_spatialbench_cli_part_no_parts() {
let temp_dir = tempdir().expect("Failed to create temporary directory");
@@ -277,6 +372,36 @@ fn test_spatialbench_cli_zero_part_zero_parts() {
));
}
+/// Test specifying parquet options even when writing tbl output
+#[tokio::test]
+async fn test_incompatible_options_warnings() {
+ let output_dir = tempdir().unwrap();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("csv")
+ .arg("--tables")
+ .arg("trip")
+ .arg("--scale-factor")
+ .arg("0.0001")
+ .arg("--output-dir")
+ .arg(output_dir.path())
+ // pass in parquet options that are incompatible with csv
+ .arg("--parquet-compression")
+ .arg("zstd(1)")
+ .arg("--parquet-row-group-bytes")
+ .arg("8192")
+ .assert()
+ // still success, but should see warnings
+ .success()
+ .stderr(predicates::str::contains(
+ "Warning: Parquet compression option set but not generating
Parquet files",
+ ))
+ .stderr(predicates::str::contains(
+ "Warning: Parquet row group size option set but not generating
Parquet files",
+ ));
+}
+
fn read_gzipped_file_to_string<P: AsRef<Path>>(path: P) -> Result<String,
std::io::Error> {
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
@@ -299,3 +424,41 @@ fn read_reference_file(table_name: &str, scale_factor:
&str) -> String {
}
}
}
+
+#[derive(Debug, PartialEq)]
+struct RowGroups {
+ table: &'static str,
+ /// total bytes in each row group
+ row_group_bytes: Vec<i64>,
+}
+
+/// For each table in tables, check that the parquet file in output_dir has
+/// a file with the expected row group sizes.
+fn expect_row_group_sizes(output_dir: &Path, expected_row_groups:
Vec<RowGroups>) {
+ let mut actual_row_groups = vec![];
+ for table in &expected_row_groups {
+ let output_path = output_dir.join(format!("{}.parquet", table.table));
+ assert!(
+ output_path.exists(),
+ "Expected parquet file {:?} to exist",
+ output_path
+ );
+ // read the metadata to get the row group size
+ let file = File::open(&output_path).expect("Failed to open parquet
file");
+ let mut metadata_reader = ParquetMetaDataReader::new();
+ metadata_reader.try_parse(&file).unwrap();
+ let metadata = metadata_reader.finish().unwrap();
+ let row_groups = metadata.row_groups();
+ let actual_row_group_bytes: Vec<_> =
+ row_groups.iter().map(|rg| rg.total_byte_size()).collect();
+ actual_row_groups.push(RowGroups {
+ table: table.table,
+ row_group_bytes: actual_row_group_bytes,
+ })
+ }
+ // compare the expected and actual row groups debug print actual on failure
+ // for better output / easier comparison
+ let expected_row_groups = format!("{expected_row_groups:#?}");
+ let actual_row_groups = format!("{actual_row_groups:#?}");
+ assert_eq!(actual_row_groups, expected_row_groups);
+}