kosiew commented on code in PR #21073:
URL: https://github.com/apache/datafusion/pull/21073#discussion_r2972766235
##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,113 @@ fn criterion_benchmark(c: &mut Criterion) {
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
+ let total_rows = partitions * samples;
+
+ // Numeric aggregate benchmarks
+ // (asc, use_topk, use_view)
+ let numeric_cases: &[(&str, bool, bool, bool)] = &[
+ ("aggregate {rows} time-series rows", false, false, false),
+ ("aggregate {rows} worst-case rows", true, false, false),
+ (
+ "top k={limit} aggregate {rows} time-series rows",
+ false,
+ true,
+ false,
+ ),
+ (
+ "top k={limit} aggregate {rows} worst-case rows",
+ true,
+ true,
+ false,
+ ),
+ (
+ "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+ false,
+ true,
+ true,
+ ),
+ (
+ "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+ true,
+ true,
+ true,
+ ),
+ ];
+ for &(name_tpl, asc, use_topk, use_view) in numeric_cases {
+ let name = name_tpl
+ .replace("{rows}", &total_rows.to_string())
+ .replace("{limit}", &limit.to_string());
+ let ctx = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
use_view))
+ .unwrap();
+ c.bench_function(&name, |b| {
+ b.iter(|| run(&rt, ctx.clone(), limit, use_topk, asc))
+ });
+ }
- let ctx = rt
- .block_on(create_context(partitions, samples, false, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} time-series rows", partitions *
samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} worst-case rows", partitions * samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // Utf8View schema,time-series rows
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- // Utf8View schema,worst-case rows
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // String aggregate benchmarks - grouping by timestamp, aggregating string
column
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} worst-case rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
+ for asc in [false, true] {
+ for use_topk in [false, true] {
+ let ctx_utf8 = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
false))
+ .unwrap();
+ let ctx_view = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
true))
+ .unwrap();
+ let result_utf8 = run_string(&rt, ctx_utf8, limit, use_topk);
+ let result_view = run_string(&rt, ctx_view, limit, use_topk);
+ assert_eq!(
+ result_utf8, result_view,
+ "Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}"
+ );
+ }
+ }
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} worst-case rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
+ // String aggregate benchmarks
+ // (asc, use_topk, use_view)
+ let string_cases: &[(bool, bool, bool)] = &[
Review Comment:
The `&[(bool, bool, bool)]` tuple list is a bit hard to read and reason
about when scanning or extending cases.
Maybe a small case struct or a helper similar to `numeric_cases` would help
make the intent clearer. Something that names `asc`, `use_topk`, and `use_view`
explicitly would also reduce the chance of mixing up tuple positions later.
##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -204,7 +204,7 @@ async fn aggregate_string(
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), LIMIT);
- Ok(())
+ Ok(format!("{}", pretty_format_batches(&batches)?))
Review Comment:
It looks like `aggregate_string` now pretty prints the collected batches and
returns a `String`, and that is what `b.iter(...)` is measuring for each string
benchmark.
This means the non TopK string benchmarks now include batch formatting and
heap allocation overhead, which the numeric benchmarks do not include. So the
comparison is no longer isolating query execution.
Would it make sense to keep a `Result<()>` fast path for the timed
benchmark, and move the `pretty_format_batches` work into a one time validation
helper that runs before benchmark registration?
##########
datafusion/core/benches/topk_aggregate.rs:
##########
@@ -290,152 +290,113 @@ fn criterion_benchmark(c: &mut Criterion) {
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
+ let total_rows = partitions * samples;
+
+ // Numeric aggregate benchmarks
+ // (asc, use_topk, use_view)
+ let numeric_cases: &[(&str, bool, bool, bool)] = &[
+ ("aggregate {rows} time-series rows", false, false, false),
+ ("aggregate {rows} worst-case rows", true, false, false),
+ (
+ "top k={limit} aggregate {rows} time-series rows",
+ false,
+ true,
+ false,
+ ),
+ (
+ "top k={limit} aggregate {rows} worst-case rows",
+ true,
+ true,
+ false,
+ ),
+ (
+ "top k={limit} aggregate {rows} time-series rows [Utf8View]",
+ false,
+ true,
+ true,
+ ),
+ (
+ "top k={limit} aggregate {rows} worst-case rows [Utf8View]",
+ true,
+ true,
+ true,
+ ),
+ ];
+ for &(name_tpl, asc, use_topk, use_view) in numeric_cases {
+ let name = name_tpl
+ .replace("{rows}", &total_rows.to_string())
+ .replace("{limit}", &limit.to_string());
+ let ctx = rt
+ .block_on(create_context(partitions, samples, asc, use_topk,
use_view))
+ .unwrap();
+ c.bench_function(&name, |b| {
+ b.iter(|| run(&rt, ctx.clone(), limit, use_topk, asc))
+ });
+ }
- let ctx = rt
- .block_on(create_context(partitions, samples, false, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} time-series rows", partitions *
samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, false, false))
- .unwrap();
- c.bench_function(
- format!("aggregate {} worst-case rows", partitions * samples).as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // Utf8View schema,time-series rows
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
- );
-
- // Utf8View schema,worst-case rows
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} aggregate {} worst-case rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
- );
-
- // String aggregate benchmarks - grouping by timestamp, aggregating string
column
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, true, true, false))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} worst-case rows [Utf8]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
-
- let ctx = rt
- .block_on(create_context(partitions, samples, false, true, true))
- .unwrap();
- c.bench_function(
- format!(
- "top k={limit} string aggregate {} time-series rows [Utf8View]",
- partitions * samples
- )
- .as_str(),
- |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
- );
+ for asc in [false, true] {
Review Comment:
Small readability thought here. Could the Utf8 vs Utf8View parity check live
in a helper?
Right now `criterion_benchmark` is doing both benchmark registration and
cross layout validation. Pulling the verification loop into something like
`assert_string_results_match(...)` would make the benchmark matrix easier to
scan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]