This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 45b670a2a chore: Consolidate TPC benchmark scripts (#3538)
45b670a2a is described below
commit 45b670a2a967d042722bea58a0768e17d905614a
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 20 16:45:10 2026 -0700
chore: Consolidate TPC benchmark scripts (#3538)
---
{dev/benchmarks => benchmarks/tpc}/.gitignore | 0
{dev/benchmarks => benchmarks/tpc}/README.md | 84 ++++-
benchmarks/tpc/create-iceberg-tables.py | 171 +++++++++
{dev/benchmarks => benchmarks/tpc}/drop-caches.sh | 0
benchmarks/tpc/engines/comet-iceberg.toml | 48 +++
.../tpc/engines/comet.toml | 22 +-
.../tpc/engines/gluten.toml | 21 +-
.../tpc/engines/spark.toml | 6 +-
.../tpc}/generate-comparison.py | 0
benchmarks/tpc/run.py | 402 +++++++++++++++++++++
{dev/benchmarks => benchmarks/tpc}/tpcbench.py | 0
dev/benchmarks/blaze-tpcds.sh | 53 ---
dev/benchmarks/blaze-tpch.sh | 53 ---
dev/benchmarks/comet-tpcds.sh | 53 ---
dev/benchmarks/comet-tpch-iceberg.sh | 114 ------
dev/benchmarks/comet-tpch.sh | 55 ---
dev/benchmarks/create-iceberg-tpch.py | 88 -----
dev/benchmarks/gluten-tpcds.sh | 53 ---
dev/benchmarks/gluten-tpch.sh | 53 ---
dev/benchmarks/spark-tpcds.sh | 45 ---
dev/benchmarks/spark-tpch.sh | 46 ---
docs/source/about/gluten_comparison.md | 2 +-
.../contributor-guide/benchmark-results/tpc-ds.md | 2 +-
.../contributor-guide/benchmark-results/tpc-h.md | 2 +-
docs/source/contributor-guide/benchmarking.md | 2 +-
.../contributor-guide/benchmarking_aws_ec2.md | 15 +-
26 files changed, 737 insertions(+), 653 deletions(-)
diff --git a/dev/benchmarks/.gitignore b/benchmarks/tpc/.gitignore
similarity index 100%
rename from dev/benchmarks/.gitignore
rename to benchmarks/tpc/.gitignore
diff --git a/dev/benchmarks/README.md b/benchmarks/tpc/README.md
similarity index 59%
rename from dev/benchmarks/README.md
rename to benchmarks/tpc/README.md
index b3ea67419..779ad1753 100644
--- a/dev/benchmarks/README.md
+++ b/benchmarks/tpc/README.md
@@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2
instance, see the [C
[Comet Benchmarking on EC2 Guide]:
https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html
+## Usage
+
+All benchmarks are run via `run.py`:
+
+```
+python3 run.py --engine <engine> --benchmark <tpch|tpcds> [options]
+```
+
+| Option | Description |
+| -------------- | ------------------------------------------------ |
+| `--engine` | Engine name (matches a TOML file in `engines/`) |
+| `--benchmark` | `tpch` or `tpcds` |
+| `--iterations` | Number of iterations (default: 1) |
+| `--output` | Output directory (default: `.`) |
+| `--query` | Run a single query number |
+| `--no-restart` | Skip Spark master/worker restart |
+| `--dry-run` | Print the spark-submit command without executing |
+
+Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`
+
## Example usage
Set Spark environment variables:
@@ -47,7 +67,7 @@ Run Spark benchmark:
```shell
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
sudo ./drop-caches.sh
-./spark-tpch.sh
+python3 run.py --engine spark --benchmark tpch
```
Run Comet benchmark:
@@ -56,7 +76,7 @@ Run Comet benchmark:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
sudo ./drop-caches.sh
-./comet-tpch.sh
+python3 run.py --engine comet --benchmark tpch
```
Run Gluten benchmark:
@@ -65,7 +85,13 @@ Run Gluten benchmark:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export
GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar
sudo ./drop-caches.sh
-./gluten-tpch.sh
+python3 run.py --engine gluten --benchmark tpch
+```
+
+Preview a command without running it:
+
+```shell
+python3 run.py --engine comet --benchmark tpch --dry-run
```
Generating charts:
@@ -74,6 +100,11 @@ Generating charts:
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet
0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local
Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json
gluten-tpch-1752337474344.json
```
+## Engine Configuration
+
+Each engine is defined by a TOML file in `engines/`. The config specifies
JARs, Spark conf overrides,
+required environment variables, and optional defaults/exports. See existing
files for examples.
+
## Iceberg Benchmarking
Comet includes native Iceberg support via iceberg-rust integration. This
enables benchmarking TPC-H queries
@@ -90,14 +121,16 @@ export
ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
Note: Table creation uses `--packages` which auto-downloads the dependency.
-### Create Iceberg TPC-H tables
+### Create Iceberg tables
-Convert existing Parquet TPC-H data to Iceberg format:
+Convert existing Parquet data to Iceberg format using
`create-iceberg-tables.py`.
+The script configures the Iceberg catalog automatically -- no `--conf` flags
needed.
```shell
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
-export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
+mkdir -p $ICEBERG_WAREHOUSE
+# TPC-H
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
@@ -106,13 +139,24 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.executor.memory=16g \
- --conf
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
- --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
- --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
- create-iceberg-tpch.py \
+ create-iceberg-tables.py \
+ --benchmark tpch \
--parquet-path $TPCH_DATA \
- --catalog $ICEBERG_CATALOG \
- --database tpch
+ --warehouse $ICEBERG_WAREHOUSE
+
+# TPC-DS
+$SPARK_HOME/bin/spark-submit \
+ --master $SPARK_MASTER \
+ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+ --conf spark.driver.memory=8G \
+ --conf spark.executor.instances=2 \
+ --conf spark.executor.cores=8 \
+ --conf spark.cores.max=16 \
+ --conf spark.executor.memory=16g \
+ create-iceberg-tables.py \
+ --benchmark tpcds \
+ --parquet-path $TPCDS_DATA \
+ --warehouse $ICEBERG_WAREHOUSE
```
### Run Iceberg benchmark
@@ -124,20 +168,22 @@ export
ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
sudo ./drop-caches.sh
-./comet-tpch-iceberg.sh
+python3 run.py --engine comet-iceberg --benchmark tpch
```
The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable
Comet's native iceberg-rust
integration. Verify native scanning is active by checking for
`CometIcebergNativeScanExec` in the
physical plan output.
-### Iceberg-specific options
+### create-iceberg-tables.py options
-| Environment Variable | Default | Description |
-| -------------------- | ---------- | ----------------------------------- |
-| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
-| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
-| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |
+| Option | Required | Default | Description
|
+| ---------------- | -------- | -------------- |
----------------------------------- |
+| `--benchmark` | Yes | | `tpch` or `tpcds`
|
+| `--parquet-path` | Yes | | Path to source Parquet data
|
+| `--warehouse` | Yes | | Path to Iceberg warehouse
directory |
+| `--catalog` | No | `local` | Iceberg catalog name
|
+| `--database` | No | benchmark name | Database name for the tables
|
### Comparing Parquet vs Iceberg performance
diff --git a/benchmarks/tpc/create-iceberg-tables.py
b/benchmarks/tpc/create-iceberg-tables.py
new file mode 100644
index 000000000..219969bda
--- /dev/null
+++ b/benchmarks/tpc/create-iceberg-tables.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Convert TPC-H or TPC-DS Parquet data to Iceberg tables.
+
+Usage:
+ spark-submit \
+ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+ create-iceberg-tables.py \
+ --benchmark tpch \
+ --parquet-path /path/to/tpch/parquet \
+ --warehouse /path/to/iceberg-warehouse
+
+ spark-submit \
+ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+ create-iceberg-tables.py \
+ --benchmark tpcds \
+ --parquet-path /path/to/tpcds/parquet \
+ --warehouse /path/to/iceberg-warehouse
+"""
+
+import argparse
+import os
+import sys
+from pyspark.sql import SparkSession
+import time
+
+TPCH_TABLES = [
+ "customer",
+ "lineitem",
+ "nation",
+ "orders",
+ "part",
+ "partsupp",
+ "region",
+ "supplier",
+]
+
+TPCDS_TABLES = [
+ "call_center",
+ "catalog_page",
+ "catalog_returns",
+ "catalog_sales",
+ "customer",
+ "customer_address",
+ "customer_demographics",
+ "date_dim",
+ "time_dim",
+ "household_demographics",
+ "income_band",
+ "inventory",
+ "item",
+ "promotion",
+ "reason",
+ "ship_mode",
+ "store",
+ "store_returns",
+ "store_sales",
+ "warehouse",
+ "web_page",
+ "web_returns",
+ "web_sales",
+ "web_site",
+]
+
+BENCHMARK_TABLES = {
+ "tpch": TPCH_TABLES,
+ "tpcds": TPCDS_TABLES,
+}
+
+
+def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str,
database: str):
+ table_names = BENCHMARK_TABLES[benchmark]
+
+ # Validate paths before starting Spark
+ errors = []
+ if not os.path.isdir(parquet_path):
+ errors.append(f"Error: --parquet-path '{parquet_path}' does not exist
or is not a directory")
+ if not os.path.isdir(warehouse):
+ errors.append(f"Error: --warehouse '{warehouse}' does not exist or is
not a directory. "
+ "Create it with: mkdir -p " + warehouse)
+ if errors:
+ for e in errors:
+ print(e, file=sys.stderr)
+ sys.exit(1)
+
+ spark = SparkSession.builder \
+ .appName(f"Create Iceberg {benchmark.upper()} Tables") \
+ .config(f"spark.sql.catalog.{catalog}",
"org.apache.iceberg.spark.SparkCatalog") \
+ .config(f"spark.sql.catalog.{catalog}.type", "hadoop") \
+ .config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \
+ .getOrCreate()
+
+ # Set the Iceberg catalog as the current catalog so that
+ # namespace operations are routed correctly
+ spark.sql(f"USE {catalog}")
+
+ # Create namespace if it doesn't exist
+ try:
+ spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}")
+ except Exception:
+ # Namespace may already exist
+ pass
+
+ for table in table_names:
+ parquet_table_path = f"{parquet_path}/{table}.parquet"
+ iceberg_table = f"{catalog}.{database}.{table}"
+
+ print(f"Converting {parquet_table_path} -> {iceberg_table}")
+ start_time = time.time()
+
+ # Drop table if exists to allow re-running
+ spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
+
+ # Read parquet and write as Iceberg
+ df = spark.read.parquet(parquet_table_path)
+ df.writeTo(iceberg_table).using("iceberg").create()
+
+ row_count = spark.table(iceberg_table).count()
+ elapsed = time.time() - start_time
+ print(f" Created {iceberg_table} with {row_count} rows in
{elapsed:.2f}s")
+
+ print(f"\nAll {benchmark.upper()} tables created successfully!")
+ print(f"Tables available at: {catalog}.{database}.*")
+
+ spark.stop()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables"
+ )
+ parser.add_argument(
+ "--benchmark", required=True, choices=["tpch", "tpcds"],
+ help="Benchmark whose tables to convert (tpch or tpcds)"
+ )
+ parser.add_argument(
+ "--parquet-path", required=True,
+ help="Path to Parquet data directory"
+ )
+ parser.add_argument(
+ "--warehouse", required=True,
+ help="Path to Iceberg warehouse directory"
+ )
+ parser.add_argument(
+ "--catalog", default="local",
+ help="Iceberg catalog name (default: 'local')"
+ )
+ parser.add_argument(
+ "--database", default=None,
+ help="Database name to create tables in (defaults to benchmark name)"
+ )
+ args = parser.parse_args()
+
+ database = args.database if args.database else args.benchmark
+ main(args.benchmark, args.parquet_path, args.warehouse, args.catalog,
database)
diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/drop-caches.sh
similarity index 100%
copy from dev/benchmarks/drop-caches.sh
copy to benchmarks/tpc/drop-caches.sh
diff --git a/benchmarks/tpc/engines/comet-iceberg.toml
b/benchmarks/tpc/engines/comet-iceberg.toml
new file mode 100644
index 000000000..2e01270f1
--- /dev/null
+++ b/benchmarks/tpc/engines/comet-iceberg.toml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[engine]
+name = "comet-iceberg"
+
+[env]
+required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]
+
+[env.defaults]
+ICEBERG_CATALOG = "local"
+
+[spark_submit]
+jars = ["$COMET_JAR", "$ICEBERG_JAR"]
+driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
+
+[spark_conf]
+"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
+"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
+"spark.plugins" = "org.apache.spark.CometPlugin"
+"spark.shuffle.manager" =
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
+"spark.comet.exec.replaceSortMergeJoin" = "true"
+"spark.comet.expression.Cast.allowIncompatible" = "true"
+"spark.comet.enabled" = "true"
+"spark.comet.exec.enabled" = "true"
+"spark.comet.scan.icebergNative.enabled" = "true"
+"spark.comet.explainFallback.enabled" = "true"
+"spark.sql.catalog.${ICEBERG_CATALOG}" =
"org.apache.iceberg.spark.SparkCatalog"
+"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop"
+"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE"
+"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}"
+
+[tpcbench_args]
+use_iceberg = true
diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/engines/comet.toml
old mode 100755
new mode 100644
similarity index 59%
copy from dev/benchmarks/drop-caches.sh
copy to benchmarks/tpc/engines/comet.toml
index 9dc1d7f68..8e19165eb
--- a/dev/benchmarks/drop-caches.sh
+++ b/benchmarks/tpc/engines/comet.toml
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-echo 1 > /proc/sys/vm/drop_caches
+[engine]
+name = "comet"
+
+[env]
+required = ["COMET_JAR"]
+
+[spark_submit]
+jars = ["$COMET_JAR"]
+driver_class_path = ["$COMET_JAR"]
+
+[spark_conf]
+"spark.driver.extraClassPath" = "$COMET_JAR"
+"spark.executor.extraClassPath" = "$COMET_JAR"
+"spark.plugins" = "org.apache.spark.CometPlugin"
+"spark.shuffle.manager" =
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
+"spark.comet.scan.impl" = "native_datafusion"
+"spark.comet.exec.replaceSortMergeJoin" = "true"
+"spark.comet.expression.Cast.allowIncompatible" = "true"
diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/engines/gluten.toml
old mode 100755
new mode 100644
similarity index 62%
copy from dev/benchmarks/drop-caches.sh
copy to benchmarks/tpc/engines/gluten.toml
index 9dc1d7f68..20165788c
--- a/dev/benchmarks/drop-caches.sh
+++ b/benchmarks/tpc/engines/gluten.toml
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-echo 1 > /proc/sys/vm/drop_caches
+[engine]
+name = "gluten"
+
+[env]
+required = ["GLUTEN_JAR"]
+exports = { TZ = "UTC" }
+
+[spark_submit]
+jars = ["$GLUTEN_JAR"]
+
+[spark_conf]
+"spark.plugins" = "org.apache.gluten.GlutenPlugin"
+"spark.driver.extraClassPath" = "${GLUTEN_JAR}"
+"spark.executor.extraClassPath" = "${GLUTEN_JAR}"
+"spark.gluten.sql.columnar.forceShuffledHashJoin" = "true"
+"spark.shuffle.manager" =
"org.apache.spark.shuffle.sort.ColumnarShuffleManager"
+"spark.sql.session.timeZone" = "UTC"
diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/engines/spark.toml
old mode 100755
new mode 100644
similarity index 94%
rename from dev/benchmarks/drop-caches.sh
rename to benchmarks/tpc/engines/spark.toml
index 9dc1d7f68..c02e7a6ad
--- a/dev/benchmarks/drop-caches.sh
+++ b/benchmarks/tpc/engines/spark.toml
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-echo 1 > /proc/sys/vm/drop_caches
+[engine]
+name = "spark"
diff --git a/dev/benchmarks/generate-comparison.py
b/benchmarks/tpc/generate-comparison.py
similarity index 100%
rename from dev/benchmarks/generate-comparison.py
rename to benchmarks/tpc/generate-comparison.py
diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py
new file mode 100755
index 000000000..41a2d5fda
--- /dev/null
+++ b/benchmarks/tpc/run.py
@@ -0,0 +1,402 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Consolidated TPC benchmark runner for Spark-based engines.
+
+Usage:
+ python3 run.py --engine comet --benchmark tpch
+ python3 run.py --engine comet --benchmark tpcds --iterations 3
+ python3 run.py --engine comet-iceberg --benchmark tpch
+ python3 run.py --engine comet --benchmark tpch --dry-run
+ python3 run.py --engine spark --benchmark tpch --no-restart
+"""
+
+import argparse
+import os
+import re
+import subprocess
+import sys
+
+# ---------------------------------------------------------------------------
+# TOML loading – prefer stdlib tomllib (3.11+), else minimal fallback
+# ---------------------------------------------------------------------------
+
+try:
+ import tomllib # Python 3.11+
+
+ def load_toml(path):
+ with open(path, "rb") as f:
+ return tomllib.load(f)
+
+except ModuleNotFoundError:
+
+ def _parse_toml(text):
+ """Minimal TOML parser supporting tables, quoted-key strings, plain
+ strings, arrays of strings, booleans, and comments. Sufficient for
+ the engine config files used by this runner."""
+ root = {}
+ current = root
+ for line in text.splitlines():
+ line = line.strip()
+ if not line or line.startswith("#"):
+ continue
+ # Table header: [env.defaults] or [spark_conf]
+ m = re.match(r"^\[([^\]]+)\]$", line)
+ if m:
+ keys = m.group(1).split(".")
+ current = root
+ for k in keys:
+ current = current.setdefault(k, {})
+ continue
+ # Key = value
+ m = re.match(r'^("(?:[^"\\]|\\.)*"|[A-Za-z0-9_.]+)\s*=\s*(.+)$',
line)
+ if not m:
+ continue
+ raw_key, raw_val = m.group(1), m.group(2).strip()
+ key = raw_key.strip('"')
+ val = _parse_value(raw_val)
+ current[key] = val
+ return root
+
+ def _parse_value(raw):
+ if raw == "true":
+ return True
+ if raw == "false":
+ return False
+ if raw.startswith('"') and raw.endswith('"'):
+ return raw[1:-1]
+ if raw.startswith("["):
+ # Simple array of strings
+ items = []
+ for m in re.finditer(r'"((?:[^"\\]|\\.)*)"', raw):
+ items.append(m.group(1))
+ return items
+ if raw.startswith("{"):
+ # Inline table: { KEY = "VAL", ... }
+ tbl = {}
+ for m in re.finditer(r'([A-Za-z0-9_]+)\s*=\s*"((?:[^"\\]|\\.)*)"',
raw):
+ tbl[m.group(1)] = m.group(2)
+ return tbl
+ return raw
+
+ def load_toml(path):
+ with open(path, "r") as f:
+ return _parse_toml(f.read())
+
+
+# ---------------------------------------------------------------------------
+# Common Spark configuration (shared across all engines)
+# ---------------------------------------------------------------------------
+
+COMMON_SPARK_CONF = {
+ "spark.driver.memory": "8G",
+ "spark.executor.memory": "16g",
+ "spark.memory.offHeap.enabled": "true",
+ "spark.memory.offHeap.size": "16g",
+ "spark.eventLog.enabled": "true",
+ "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
+ "spark.hadoop.fs.s3a.aws.credentials.provider":
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
+}
+
+# ---------------------------------------------------------------------------
+# Benchmark profiles
+# ---------------------------------------------------------------------------
+
+BENCHMARK_PROFILES = {
+ "tpch": {
+ "executor_instances": "1",
+ "executor_cores": "8",
+ "max_cores": "8",
+ "data_env": "TPCH_DATA",
+ "queries_env": "TPCH_QUERIES",
+ "format": "parquet",
+ },
+ "tpcds": {
+ "executor_instances": "2",
+ "executor_cores": "8",
+ "max_cores": "16",
+ "data_env": "TPCDS_DATA",
+ "queries_env": "TPCDS_QUERIES",
+ "format": None, # omit --format for TPC-DS
+ },
+}
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def resolve_env(value):
+ """Expand $VAR and ${VAR} references using os.environ."""
+ if not isinstance(value, str):
+ return value
+ return re.sub(
+ r"\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)",
+ lambda m: os.environ.get(m.group(1) or m.group(2), ""),
+ value,
+ )
+
+
+def resolve_env_in_list(lst):
+ return [resolve_env(v) for v in lst]
+
+
+def load_engine_config(engine_name):
+ """Load and return the TOML config for the given engine."""
+ script_dir = os.path.dirname(os.path.abspath(__file__))
+ config_path = os.path.join(script_dir, "engines", f"{engine_name}.toml")
+ if not os.path.exists(config_path):
+ available = sorted(
+ f.removesuffix(".toml")
+ for f in os.listdir(os.path.join(script_dir, "engines"))
+ if f.endswith(".toml")
+ )
+ print(f"Error: Unknown engine '{engine_name}'", file=sys.stderr)
+ print(f"Available engines: {', '.join(available)}", file=sys.stderr)
+ sys.exit(1)
+ return load_toml(config_path)
+
+
+def apply_env_defaults(config):
+ """Set environment variable defaults from [env.defaults]."""
+ defaults = config.get("env", {}).get("defaults", {})
+ for key, val in defaults.items():
+ if key not in os.environ:
+ os.environ[key] = val
+
+
+def apply_env_exports(config):
+ """Export environment variables from [env.exports]."""
+ exports = config.get("env", {}).get("exports", {})
+ for key, val in exports.items():
+ os.environ[key] = val
+
+
+def check_required_env(config):
+ """Validate that required environment variables are set."""
+ required = config.get("env", {}).get("required", [])
+ missing = [v for v in required if not os.environ.get(v)]
+ if missing:
+ print(
+ f"Error: Required environment variable(s) not set: {',
'.join(missing)}",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+def check_common_env():
+ """Validate SPARK_HOME and SPARK_MASTER are set."""
+ for var in ("SPARK_HOME", "SPARK_MASTER"):
+ if not os.environ.get(var):
+ print(f"Error: {var} is not set", file=sys.stderr)
+ sys.exit(1)
+
+
+def check_benchmark_env(config, benchmark):
+ """Validate benchmark-specific environment variables."""
+ profile = BENCHMARK_PROFILES[benchmark]
+ use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False)
+
+ required = [profile["queries_env"]]
+ if not use_iceberg:
+ required.append(profile["data_env"])
+
+ missing = [v for v in required if not os.environ.get(v)]
+ if missing:
+ print(
+ f"Error: Required environment variable(s) not set for "
+ f"{benchmark}: {', '.join(missing)}",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ # Default ICEBERG_DATABASE to the benchmark name if not already set
+ if use_iceberg and not os.environ.get("ICEBERG_DATABASE"):
+ os.environ["ICEBERG_DATABASE"] = benchmark
+
+
+def build_spark_submit_cmd(config, benchmark, args):
+ """Build the spark-submit command list."""
+ spark_home = os.environ["SPARK_HOME"]
+ spark_master = os.environ["SPARK_MASTER"]
+ profile = BENCHMARK_PROFILES[benchmark]
+
+ cmd = [os.path.join(spark_home, "bin", "spark-submit")]
+ cmd += ["--master", spark_master]
+
+ # --jars
+ jars = config.get("spark_submit", {}).get("jars", [])
+ if jars:
+ cmd += ["--jars", ",".join(resolve_env_in_list(jars))]
+
+ # --driver-class-path
+ driver_cp = config.get("spark_submit", {}).get("driver_class_path", [])
+ if driver_cp:
+ cmd += ["--driver-class-path",
":".join(resolve_env_in_list(driver_cp))]
+
+ # Merge spark confs: common + benchmark profile + engine overrides
+ conf = dict(COMMON_SPARK_CONF)
+ conf["spark.executor.instances"] = profile["executor_instances"]
+ conf["spark.executor.cores"] = profile["executor_cores"]
+ conf["spark.cores.max"] = profile["max_cores"]
+
+ engine_conf = config.get("spark_conf", {})
+ for key, val in engine_conf.items():
+ if isinstance(val, bool):
+ val = "true" if val else "false"
+ conf[resolve_env(key)] = resolve_env(str(val))
+
+ for key, val in sorted(conf.items()):
+ cmd += ["--conf", f"{key}={val}"]
+
+ # tpcbench.py path
+ cmd.append("tpcbench.py")
+
+ # tpcbench args
+ engine_name = config.get("engine", {}).get("name", args.engine)
+ cmd += ["--name", engine_name]
+ cmd += ["--benchmark", benchmark]
+
+ use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False)
+ if use_iceberg:
+ cmd += ["--catalog", resolve_env("${ICEBERG_CATALOG}")]
+ cmd += ["--database", resolve_env("${ICEBERG_DATABASE}")]
+ else:
+ data_var = profile["data_env"]
+ data_val = os.environ.get(data_var, "")
+ cmd += ["--data", data_val]
+
+ queries_var = profile["queries_env"]
+ queries_val = os.environ.get(queries_var, "")
+ cmd += ["--queries", queries_val]
+
+ cmd += ["--output", args.output]
+ cmd += ["--iterations", str(args.iterations)]
+
+ if args.query is not None:
+ cmd += ["--query", str(args.query)]
+
+ if profile["format"] and not use_iceberg:
+ cmd += ["--format", profile["format"]]
+
+ return cmd
+
+
+def restart_spark():
+ """Stop and start Spark master and worker."""
+ spark_home = os.environ["SPARK_HOME"]
+ sbin = os.path.join(spark_home, "sbin")
+ spark_master = os.environ["SPARK_MASTER"]
+
+ # Stop (ignore errors)
+ subprocess.run(
+ [os.path.join(sbin, "stop-master.sh")],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ subprocess.run(
+ [os.path.join(sbin, "stop-worker.sh")],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ # Start (check errors)
+ r = subprocess.run([os.path.join(sbin, "start-master.sh")])
+ if r.returncode != 0:
+ print("Error: Failed to start Spark master", file=sys.stderr)
+ sys.exit(1)
+
+ r = subprocess.run([os.path.join(sbin, "start-worker.sh"), spark_master])
+ if r.returncode != 0:
+ print("Error: Failed to start Spark worker", file=sys.stderr)
+ sys.exit(1)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Consolidated TPC benchmark runner for Spark-based
engines."
+ )
+ parser.add_argument(
+ "--engine",
+ required=True,
+ help="Engine name (matches a TOML file in engines/)",
+ )
+ parser.add_argument(
+ "--benchmark",
+ required=True,
+ choices=["tpch", "tpcds"],
+ help="Benchmark to run",
+ )
+ parser.add_argument(
+ "--iterations", type=int, default=1, help="Number of iterations
(default: 1)"
+ )
+ parser.add_argument(
+ "--output", default=".", help="Output directory (default: .)"
+ )
+ parser.add_argument(
+ "--query", type=int, default=None, help="Run a single query number"
+ )
+ parser.add_argument(
+ "--no-restart",
+ action="store_true",
+ help="Skip Spark master/worker restart",
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Print the spark-submit command without executing",
+ )
+ args = parser.parse_args()
+
+ config = load_engine_config(args.engine)
+
+ # Apply env defaults and exports before validation
+ apply_env_defaults(config)
+ apply_env_exports(config)
+
+ check_common_env()
+ check_required_env(config)
+ check_benchmark_env(config, args.benchmark)
+
+ # Restart Spark unless --no-restart or --dry-run
+ if not args.no_restart and not args.dry_run:
+ restart_spark()
+
+ cmd = build_spark_submit_cmd(config, args.benchmark, args)
+
+ if args.dry_run:
+ # Group paired arguments (e.g. --conf key=value) on one line
+ parts = []
+ i = 0
+ while i < len(cmd):
+ token = cmd[i]
+ if token.startswith("--") and i + 1 < len(cmd) and not cmd[i +
1].startswith("--"):
+ parts.append(f"{token} {cmd[i + 1]}")
+ i += 2
+ else:
+ parts.append(token)
+ i += 1
+ print(" \\\n ".join(parts))
+ else:
+ r = subprocess.run(cmd)
+ sys.exit(r.returncode)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/dev/benchmarks/tpcbench.py b/benchmarks/tpc/tpcbench.py
similarity index 100%
rename from dev/benchmarks/tpcbench.py
rename to benchmarks/tpc/tpcbench.py
diff --git a/dev/benchmarks/blaze-tpcds.sh b/dev/benchmarks/blaze-tpcds.sh
deleted file mode 100755
index 90a4a4846..000000000
--- a/dev/benchmarks/blaze-tpcds.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --jars $BLAZE_JAR \
- --driver-class-path $BLAZE_JAR \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=2 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=16 \
- --conf spark.executor.memory=16g \
- --conf spark.executor.memoryOverhead=16g \
- --conf spark.memory.offHeap.enabled=false \
- --conf spark.eventLog.enabled=true \
- --conf spark.driver.extraClassPath=$BLAZE_JAR \
- --conf spark.executor.extraClassPath=$BLAZE_JAR \
- --conf
spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \
- --conf
spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager
\
- --conf spark.blaze.enable=true \
- --conf spark.blaze.forceShuffledHashJoin=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name blaze \
- --benchmark tpcds \
- --data $TPCDS_DATA \
- --queries $TPCDS_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/blaze-tpch.sh b/dev/benchmarks/blaze-tpch.sh
deleted file mode 100755
index 2c6878737..000000000
--- a/dev/benchmarks/blaze-tpch.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --jars $BLAZE_JAR \
- --driver-class-path $BLAZE_JAR \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=1 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=8 \
- --conf spark.executor.memory=16g \
- --conf spark.executor.memoryOverhead=16g \
- --conf spark.memory.offHeap.enabled=false \
- --conf spark.eventLog.enabled=true \
- --conf spark.driver.extraClassPath=$BLAZE_JAR \
- --conf spark.executor.extraClassPath=$BLAZE_JAR \
- --conf
spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \
- --conf
spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager
\
- --conf spark.blaze.enable=true \
- --conf spark.blaze.forceShuffledHashJoin=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name blaze \
- --benchmark tpch \
- --data $TPCH_DATA \
- --queries $TPCH_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/comet-tpcds.sh b/dev/benchmarks/comet-tpcds.sh
deleted file mode 100755
index b55b27188..000000000
--- a/dev/benchmarks/comet-tpcds.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --jars $COMET_JAR \
- --driver-class-path $COMET_JAR \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=2 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=16 \
- --conf spark.executor.memory=16g \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.eventLog.enabled=true \
- --conf spark.driver.extraClassPath=$COMET_JAR \
- --conf spark.executor.extraClassPath=$COMET_JAR \
- --conf spark.plugins=org.apache.spark.CometPlugin \
- --conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
- --conf spark.comet.scan.impl=native_datafusion \
- --conf spark.comet.expression.Cast.allowIncompatible=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name comet \
- --benchmark tpcds \
- --data $TPCDS_DATA \
- --queries $TPCDS_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/comet-tpch-iceberg.sh
b/dev/benchmarks/comet-tpch-iceberg.sh
deleted file mode 100755
index 7907125c8..000000000
--- a/dev/benchmarks/comet-tpch-iceberg.sh
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust
integration.
-#
-# Required environment variables:
-# SPARK_HOME - Path to Spark installation
-# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077)
-# COMET_JAR - Path to Comet JAR
-# ICEBERG_JAR - Path to Iceberg Spark runtime JAR
-# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory
-# TPCH_QUERIES - Path to TPC-H query files
-#
-# Optional:
-# ICEBERG_CATALOG - Catalog name (default: local)
-# ICEBERG_DATABASE - Database name (default: tpch)
-#
-# Setup (run once to create Iceberg tables from Parquet):
-# $SPARK_HOME/bin/spark-submit \
-# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
-# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
-# --conf spark.sql.catalog.local.type=hadoop \
-# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \
-# create-iceberg-tpch.py \
-# --parquet-path $TPCH_DATA \
-# --catalog local \
-# --database tpch
-
-set -e
-
-# Defaults
-ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
-ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch}
-
-# Validate required variables
-if [ -z "$SPARK_HOME" ]; then
- echo "Error: SPARK_HOME is not set"
- exit 1
-fi
-if [ -z "$COMET_JAR" ]; then
- echo "Error: COMET_JAR is not set"
- exit 1
-fi
-if [ -z "$ICEBERG_JAR" ]; then
- echo "Error: ICEBERG_JAR is not set"
- echo "Download from:
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/"
- exit 1
-fi
-if [ -z "$ICEBERG_WAREHOUSE" ]; then
- echo "Error: ICEBERG_WAREHOUSE is not set"
- exit 1
-fi
-if [ -z "$TPCH_QUERIES" ]; then
- echo "Error: TPCH_QUERIES is not set"
- exit 1
-fi
-
-$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true
-$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --jars $COMET_JAR,$ICEBERG_JAR \
- --driver-class-path $COMET_JAR:$ICEBERG_JAR \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=1 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=8 \
- --conf spark.executor.memory=16g \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.eventLog.enabled=true \
- --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
- --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
- --conf spark.plugins=org.apache.spark.CometPlugin \
- --conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
- --conf spark.comet.exec.replaceSortMergeJoin=true \
- --conf spark.comet.expression.Cast.allowIncompatible=true \
- --conf spark.comet.enabled=true \
- --conf spark.comet.exec.enabled=true \
- --conf spark.comet.scan.icebergNative.enabled=true \
- --conf spark.comet.explainFallback.enabled=true \
- --conf
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
- --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
- --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
- --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \
- tpcbench.py \
- --name comet-iceberg \
- --benchmark tpch \
- --catalog $ICEBERG_CATALOG \
- --database $ICEBERG_DATABASE \
- --queries $TPCH_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh
deleted file mode 100755
index a748a0231..000000000
--- a/dev/benchmarks/comet-tpch.sh
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --jars $COMET_JAR \
- --driver-class-path $COMET_JAR \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=1 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=8 \
- --conf spark.executor.memory=16g \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.eventLog.enabled=true \
- --conf spark.driver.extraClassPath=$COMET_JAR \
- --conf spark.executor.extraClassPath=$COMET_JAR \
- --conf spark.plugins=org.apache.spark.CometPlugin \
- --conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
- --conf spark.comet.scan.impl=native_datafusion \
- --conf spark.comet.exec.replaceSortMergeJoin=true \
- --conf spark.comet.expression.Cast.allowIncompatible=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name comet \
- --benchmark tpch \
- --data $TPCH_DATA \
- --queries $TPCH_QUERIES \
- --output . \
- --iterations 1 \
- --format parquet
diff --git a/dev/benchmarks/create-iceberg-tpch.py
b/dev/benchmarks/create-iceberg-tpch.py
deleted file mode 100644
index 44f0f63a2..000000000
--- a/dev/benchmarks/create-iceberg-tpch.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Convert TPC-H Parquet data to Iceberg tables.
-
-Usage:
- spark-submit \
- --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
- --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
- --conf spark.sql.catalog.local.type=hadoop \
- --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \
- create-iceberg-tpch.py \
- --parquet-path /path/to/tpch/parquet \
- --catalog local \
- --database tpch
-"""
-
-import argparse
-from pyspark.sql import SparkSession
-import time
-
-
-def main(parquet_path: str, catalog: str, database: str):
- spark = SparkSession.builder \
- .appName("Create Iceberg TPC-H Tables") \
- .getOrCreate()
-
- table_names = [
- "customer",
- "lineitem",
- "nation",
- "orders",
- "part",
- "partsupp",
- "region",
- "supplier"
- ]
-
- # Create database if it doesn't exist
- spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}")
-
- for table in table_names:
- parquet_table_path = f"{parquet_path}/{table}.parquet"
- iceberg_table = f"{catalog}.{database}.{table}"
-
- print(f"Converting {parquet_table_path} -> {iceberg_table}")
- start_time = time.time()
-
- # Drop table if exists to allow re-running
- spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
-
- # Read parquet and write as Iceberg
- df = spark.read.parquet(parquet_table_path)
- df.writeTo(iceberg_table).using("iceberg").create()
-
- row_count = spark.table(iceberg_table).count()
- elapsed = time.time() - start_time
- print(f" Created {iceberg_table} with {row_count} rows in
{elapsed:.2f}s")
-
- print("\nAll TPC-H tables created successfully!")
- print(f"Tables available at: {catalog}.{database}.*")
-
- spark.stop()
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data
to Iceberg tables")
- parser.add_argument("--parquet-path", required=True, help="Path to TPC-H
Parquet data directory")
- parser.add_argument("--catalog", required=True, help="Iceberg catalog name
(e.g., 'local')")
- parser.add_argument("--database", default="tpch", help="Database name to
create tables in")
- args = parser.parse_args()
-
- main(args.parquet_path, args.catalog, args.database)
diff --git a/dev/benchmarks/gluten-tpcds.sh b/dev/benchmarks/gluten-tpcds.sh
deleted file mode 100755
index 7c475c79c..000000000
--- a/dev/benchmarks/gluten-tpcds.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-export TZ=UTC
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=2 \
- --conf spark.executor.memory=16G \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=16 \
- --conf spark.eventLog.enabled=true \
- --jars $GLUTEN_JAR \
- --conf spark.plugins=org.apache.gluten.GlutenPlugin \
- --conf spark.driver.extraClassPath=${GLUTEN_JAR} \
- --conf spark.executor.extraClassPath=${GLUTEN_JAR} \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \
- --conf
spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name gluten \
- --benchmark tpcds \
- --data $TPCDS_DATA \
- --queries $TPCDS_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/gluten-tpch.sh b/dev/benchmarks/gluten-tpch.sh
deleted file mode 100755
index 46c3ed752..000000000
--- a/dev/benchmarks/gluten-tpch.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-export TZ=UTC
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=1 \
- --conf spark.executor.memory=16G \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=8 \
- --conf spark.eventLog.enabled=true \
- --jars $GLUTEN_JAR \
- --conf spark.plugins=org.apache.gluten.GlutenPlugin \
- --conf spark.driver.extraClassPath=${GLUTEN_JAR} \
- --conf spark.executor.extraClassPath=${GLUTEN_JAR} \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \
- --conf
spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \
- --conf spark.sql.session.timeZone=UTC \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name gluten \
- --benchmark tpch \
- --data $TPCH_DATA \
- --queries $TPCH_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/spark-tpcds.sh b/dev/benchmarks/spark-tpcds.sh
deleted file mode 100755
index dad079ba2..000000000
--- a/dev/benchmarks/spark-tpcds.sh
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=2 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=16 \
- --conf spark.executor.memory=16g \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.eventLog.enabled=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name spark \
- --benchmark tpcds \
- --data $TPCDS_DATA \
- --queries $TPCDS_QUERIES \
- --output . \
- --iterations 1
diff --git a/dev/benchmarks/spark-tpch.sh b/dev/benchmarks/spark-tpch.sh
deleted file mode 100755
index ae359f049..000000000
--- a/dev/benchmarks/spark-tpch.sh
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$SPARK_HOME/sbin/stop-master.sh
-$SPARK_HOME/sbin/stop-worker.sh
-
-$SPARK_HOME/sbin/start-master.sh
-$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
-
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --conf spark.driver.memory=8G \
- --conf spark.executor.instances=1 \
- --conf spark.executor.cores=8 \
- --conf spark.cores.max=8 \
- --conf spark.executor.memory=16g \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16g \
- --conf spark.eventLog.enabled=true \
- --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
- --conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
- tpcbench.py \
- --name spark \
- --benchmark tpch \
- --data $TPCH_DATA \
- --queries $TPCH_QUERIES \
- --output . \
- --iterations 1 \
- --format parquet
diff --git a/docs/source/about/gluten_comparison.md
b/docs/source/about/gluten_comparison.md
index 492479bb9..40c6c2741 100644
--- a/docs/source/about/gluten_comparison.md
+++ b/docs/source/about/gluten_comparison.md
@@ -86,7 +86,7 @@ on your existing Spark jobs.

-The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks).
+The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc).
## Ease of Development & Contributing
diff --git a/docs/source/contributor-guide/benchmark-results/tpc-ds.md
b/docs/source/contributor-guide/benchmark-results/tpc-ds.md
index 66ff2e51a..bea254cc0 100644
--- a/docs/source/contributor-guide/benchmark-results/tpc-ds.md
+++ b/docs/source/contributor-guide/benchmark-results/tpc-ds.md
@@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is
available here:
- [Spark](spark-3.5.3-tpcds.json)
- [Comet](comet-0.11.0-tpcds.json)
-The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks).
+The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc).
diff --git a/docs/source/contributor-guide/benchmark-results/tpc-h.md
b/docs/source/contributor-guide/benchmark-results/tpc-h.md
index 4424d9eac..2170426c0 100644
--- a/docs/source/contributor-guide/benchmark-results/tpc-h.md
+++ b/docs/source/contributor-guide/benchmark-results/tpc-h.md
@@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is
available here:
- [Spark](spark-3.5.3-tpch.json)
- [Comet](comet-0.11.0-tpch.json)
-The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks).
+The scripts that were used to generate these results can be found
[here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc).
diff --git a/docs/source/contributor-guide/benchmarking.md
b/docs/source/contributor-guide/benchmarking.md
index 768089c95..49af73376 100644
--- a/docs/source/contributor-guide/benchmarking.md
+++ b/docs/source/contributor-guide/benchmarking.md
@@ -21,7 +21,7 @@ under the License.
To track progress on performance, we regularly run benchmarks derived from
TPC-H and TPC-DS.
-The benchmarking scripts are contained at
[https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks).
+The benchmarking scripts are contained
[here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc).
Data generation scripts are available in the [DataFusion
Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository.
diff --git a/docs/source/contributor-guide/benchmarking_aws_ec2.md
b/docs/source/contributor-guide/benchmarking_aws_ec2.md
index 922b0379f..81f15d64e 100644
--- a/docs/source/contributor-guide/benchmarking_aws_ec2.md
+++ b/docs/source/contributor-guide/benchmarking_aws_ec2.md
@@ -109,23 +109,23 @@ export
COMET_JAR=/home/ec2-user/datafusion-comet/spark/target/comet-spark-spark3
## Run Benchmarks
-Use the scripts in `dev/benchmarks` in the Comet repository.
+Use the scripts in `benchmarks/tpc` in the Comet repository.
```shell
-cd dev/benchmarks
+cd benchmarks/tpc
export TPCH_QUERIES=/home/ec2-user/datafusion-benchmarks/tpch/queries/
```
Run Spark benchmark:
```shell
-./spark-tpch.sh
+python3 run.py --engine spark --benchmark tpch
```
Run Comet benchmark:
```shell
-./comet-tpch.sh
+python3 run.py --engine comet --benchmark tpch
```
## Running Benchmarks with S3
@@ -164,4 +164,9 @@ Modify the scripts to add the following configurations.
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
```
-Now run the `spark-tpch.sh` and `comet-tpch.sh` scripts.
+Now run the benchmarks:
+
+```shell
+python3 run.py --engine spark --benchmark tpch
+python3 run.py --engine comet --benchmark tpch
+```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]