This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new abb0a43  chore(provision): make Spark integration test provisioning 
idempotent (#198)
abb0a43 is described below

commit abb0a4307c8d49d2dfecd8d15a417ce1ee6fbb13
Author: Song Chuanqi <[email protected]>
AuthorDate: Sun Apr 5 08:21:26 2026 +0800

    chore(provision): make Spark integration test provisioning idempotent (#198)
---
 dev/spark/provision.py | 38 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index f22338b..c898e58 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -20,12 +20,50 @@
 # Provisions Paimon tables into the warehouse (file:/tmp/paimon-warehouse)
 # for paimon-rust integration tests to read.
 
+import shutil
+from pathlib import Path
+from urllib.parse import unquote, urlparse
+
 from pyspark.sql import SparkSession
 
 
+def _warehouse_path_from_spark_conf(spark: SparkSession) -> Path:
+    warehouse_uri = spark.conf.get("spark.sql.catalog.paimon.warehouse")
+    parsed = urlparse(warehouse_uri)
+
+    if parsed.scheme not in ("", "file"):
+        raise ValueError(
+            f"Unsupported Paimon warehouse URI scheme {parsed.scheme!r}: 
{warehouse_uri}"
+        )
+
+    if parsed.netloc not in ("", "localhost"):
+        raise ValueError(
+            f"Unsupported remote Paimon warehouse location {parsed.netloc!r}: 
{warehouse_uri}"
+        )
+
+    warehouse_path = Path(unquote(parsed.path if parsed.scheme else 
warehouse_uri))
+    if not warehouse_path.is_absolute() or str(warehouse_path) == "/":
+        raise ValueError(f"Refusing to clear unsafe warehouse path: 
{warehouse_path}")
+
+    return warehouse_path
+
+
+def _reset_warehouse_dir(warehouse_path: Path) -> None:
+    warehouse_path.mkdir(parents=True, exist_ok=True)
+
+    for child in warehouse_path.iterdir():
+        if child.is_symlink() or child.is_file():
+            child.unlink()
+        else:
+            shutil.rmtree(child)
+
+
 def main():
     spark = SparkSession.builder.getOrCreate()
 
+    warehouse_path = _warehouse_path_from_spark_conf(spark)
+    _reset_warehouse_dir(warehouse_path)
+
     # Use Paimon catalog (configured in spark-defaults.conf with warehouse 
file:/tmp/paimon-warehouse)
     spark.sql("USE paimon.default")
 

Reply via email to