This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new abb0a43 chore(provision): make Spark integration test provisioning
idempotent (#198)
abb0a43 is described below
commit abb0a4307c8d49d2dfecd8d15a417ce1ee6fbb13
Author: Song Chuanqi <[email protected]>
AuthorDate: Sun Apr 5 08:21:26 2026 +0800
chore(provision): make Spark integration test provisioning idempotent (#198)
---
dev/spark/provision.py | 38 ++++++++++++++++++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index f22338b..c898e58 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -20,12 +20,50 @@
# Provisions Paimon tables into the warehouse (file:/tmp/paimon-warehouse)
# for paimon-rust integration tests to read.
+import shutil
+from pathlib import Path
+from urllib.parse import unquote, urlparse
+
from pyspark.sql import SparkSession
+def _warehouse_path_from_spark_conf(spark: SparkSession) -> Path:
+ warehouse_uri = spark.conf.get("spark.sql.catalog.paimon.warehouse")
+ parsed = urlparse(warehouse_uri)
+
+ if parsed.scheme not in ("", "file"):
+ raise ValueError(
+ f"Unsupported Paimon warehouse URI scheme {parsed.scheme!r}:
{warehouse_uri}"
+ )
+
+ if parsed.netloc not in ("", "localhost"):
+ raise ValueError(
+ f"Unsupported remote Paimon warehouse location {parsed.netloc!r}:
{warehouse_uri}"
+ )
+
+ warehouse_path = Path(unquote(parsed.path if parsed.scheme else
warehouse_uri))
+ if not warehouse_path.is_absolute() or str(warehouse_path) == "/":
+ raise ValueError(f"Refusing to clear unsafe warehouse path:
{warehouse_path}")
+
+ return warehouse_path
+
+
+def _reset_warehouse_dir(warehouse_path: Path) -> None:
+ warehouse_path.mkdir(parents=True, exist_ok=True)
+
+ for child in warehouse_path.iterdir():
+ if child.is_symlink() or child.is_file():
+ child.unlink()
+ else:
+ shutil.rmtree(child)
+
+
def main():
spark = SparkSession.builder.getOrCreate()
+ warehouse_path = _warehouse_path_from_spark_conf(spark)
+ _reset_warehouse_dir(warehouse_path)
+
# Use Paimon catalog (configured in spark-defaults.conf with warehouse
file:/tmp/paimon-warehouse)
spark.sql("USE paimon.default")