This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cb8b98004 Add datasource metrics (#10787)
0cb8b98004 is described below

commit 0cb8b98004964af11c347417d8b3072ef59b3184
Author: Rui Mo <[email protected]>
AuthorDate: Tue Sep 23 18:30:56 2025 +0100

    Add datasource metrics (#10787)
    
    Add Velox datasource add-split and read metrics in Gluten.
---
 .../main/java/org/apache/gluten/metrics/Metrics.java   |  9 +++++++++
 .../org/apache/gluten/metrics/OperatorMetrics.java     |  6 ++++++
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala     | 18 ++++++++++++++++++
 .../gluten/metrics/BatchScanMetricsUpdater.scala       |  2 ++
 .../gluten/metrics/FileSourceScanMetricsUpdater.scala  |  4 ++++
 .../gluten/metrics/HiveTableScanMetricsUpdater.scala   |  4 ++++
 .../scala/org/apache/gluten/metrics/MetricsUtil.scala  |  6 ++++++
 .../apache/gluten/execution/VeloxMetricsSuite.scala    | 12 ++++++++++++
 cpp/core/jni/JniWrapper.cc                             |  4 +++-
 cpp/core/utils/Metrics.h                               |  2 ++
 cpp/velox/compute/WholeStageResultIterator.cc          |  6 ++++++
 11 files changed, 72 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 8d5c92b0cd..d9b16aef98 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -52,6 +52,8 @@ public class Metrics implements IMetrics {
   public long[] localReadBytes;
   public long[] ramReadBytes;
   public long[] preloadSplits;
+  public long[] dataSourceAddSplitTime;
+  public long[] dataSourceReadTime;
 
   public long[] physicalWrittenBytes;
   public long[] writeIOTime;
@@ -97,6 +99,8 @@ public class Metrics implements IMetrics {
       long[] localReadBytes,
       long[] ramReadBytes,
       long[] preloadSplits,
+      long[] dataSourceAddSplitTime,
+      long[] dataSourceReadTime,
       long[] physicalWrittenBytes,
       long[] writeIOTime,
       long[] numWrittenFiles,
@@ -135,6 +139,9 @@ public class Metrics implements IMetrics {
     this.localReadBytes = localReadBytes;
     this.ramReadBytes = ramReadBytes;
     this.preloadSplits = preloadSplits;
+    this.dataSourceAddSplitTime = dataSourceAddSplitTime;
+    this.dataSourceReadTime = dataSourceReadTime;
+
     this.physicalWrittenBytes = physicalWrittenBytes;
     this.writeIOTime = writeIOTime;
     this.numWrittenFiles = numWrittenFiles;
@@ -180,6 +187,8 @@ public class Metrics implements IMetrics {
         localReadBytes[index],
         ramReadBytes[index],
         preloadSplits[index],
+        dataSourceAddSplitTime[index],
+        dataSourceReadTime[index],
         physicalWrittenBytes[index],
         writeIOTime[index],
         numWrittenFiles[index]);
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 36d827a288..24bedf0a46 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -50,6 +50,8 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long localReadBytes;
   public long ramReadBytes;
   public long preloadSplits;
+  public long dataSourceAddSplitTime;
+  public long dataSourceReadTime;
 
   public long physicalWrittenBytes;
   public long writeIOTime;
@@ -90,6 +92,8 @@ public class OperatorMetrics implements IOperatorMetrics {
       long localReadBytes,
       long ramReadBytes,
       long preloadSplits,
+      long dataSourceAddSplitTime,
+      long dataSourceReadTime,
       long physicalWrittenBytes,
       long writeIOTime,
       long numWrittenFiles) {
@@ -126,6 +130,8 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.localReadBytes = localReadBytes;
     this.ramReadBytes = ramReadBytes;
     this.preloadSplits = preloadSplits;
+    this.dataSourceAddSplitTime = dataSourceAddSplitTime;
+    this.dataSourceReadTime = dataSourceReadTime;
     this.physicalWrittenBytes = physicalWrittenBytes;
     this.writeIOTime = writeIOTime;
     this.numWrittenFiles = numWrittenFiles;
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 901bad8be4..0feefe02bd 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -108,6 +108,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped splits"),
       "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
processed splits"),
       "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
preloaded splits"),
+      "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source add split time"),
+      "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source read time"),
       "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped row groups"),
       "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
processed row groups"),
       "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
@@ -149,6 +155,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped splits"),
       "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
processed splits"),
       "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
preloaded splits"),
+      "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source add split time"),
+      "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source read time"),
       "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped row groups"),
       "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
processed row groups"),
       "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
@@ -190,6 +202,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped splits"),
       "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
processed splits"),
       "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of 
preloaded splits"),
+      "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source add split time"),
+      "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+        sparkContext,
+        "data source read time"),
       "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
skipped row groups"),
       "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of 
processed row groups"),
       "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 31f6b5a313..886e8353b8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -54,6 +54,8 @@ class BatchScanMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metri
       metrics("localReadBytes") += operatorMetrics.localReadBytes
       metrics("ramReadBytes") += operatorMetrics.ramReadBytes
       metrics("preloadSplits") += operatorMetrics.preloadSplits
+      metrics("dataSourceAddSplitTime") += 
operatorMetrics.dataSourceAddSplitTime
+      metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index f8693f9246..b76525d7fb 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -42,6 +42,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val skippedSplits: SQLMetric = metrics("skippedSplits")
   val processedSplits: SQLMetric = metrics("processedSplits")
   val preloadSplits: SQLMetric = metrics("preloadSplits")
+  val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
+  val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
   val skippedStrides: SQLMetric = metrics("skippedStrides")
   val processedStrides: SQLMetric = metrics("processedStrides")
   val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
@@ -80,6 +82,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
       localReadBytes += operatorMetrics.localReadBytes
       ramReadBytes += operatorMetrics.ramReadBytes
       preloadSplits += operatorMetrics.preloadSplits
+      dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
+      dataSourceReadTime += operatorMetrics.dataSourceReadTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
index f7e0dc1800..c8e4e98cdf 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
@@ -37,6 +37,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
   val skippedSplits: SQLMetric = metrics("skippedSplits")
   val processedSplits: SQLMetric = metrics("processedSplits")
   val preloadSplits: SQLMetric = metrics("preloadSplits")
+  val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
+  val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
   val skippedStrides: SQLMetric = metrics("skippedStrides")
   val processedStrides: SQLMetric = metrics("processedStrides")
   val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
@@ -75,6 +77,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
       localReadBytes += operatorMetrics.localReadBytes
       ramReadBytes += operatorMetrics.ramReadBytes
       preloadSplits += operatorMetrics.preloadSplits
+      dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
+      dataSourceReadTime += operatorMetrics.dataSourceReadTime
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index d9e269850b..cd3e0eafa0 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -133,6 +133,8 @@ object MetricsUtil extends Logging {
     var localReadBytes: Long = 0
     var ramReadBytes: Long = 0
     var preloadSplits: Long = 0
+    var dataSourceAddSplitTime: Long = 0
+    var dataSourceReadTime: Long = 0
     var numWrittenFiles: Long = 0
 
     val metricsIterator = operatorMetrics.iterator()
@@ -163,6 +165,8 @@ object MetricsUtil extends Logging {
       localReadBytes += metrics.localReadBytes
       ramReadBytes += metrics.ramReadBytes
       preloadSplits += metrics.preloadSplits
+      dataSourceAddSplitTime += metrics.dataSourceAddSplitTime
+      dataSourceReadTime += metrics.dataSourceReadTime
       numWrittenFiles += metrics.numWrittenFiles
     }
 
@@ -200,6 +204,8 @@ object MetricsUtil extends Logging {
       localReadBytes,
       ramReadBytes,
       preloadSplits,
+      dataSourceAddSplitTime,
+      dataSourceReadTime,
       physicalWrittenBytes,
       writeIOTime,
       numWrittenFiles
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 954a1eacc5..db26ce2985 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -278,6 +278,18 @@ class VeloxMetricsSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     assert(metrics("ramReadBytes").value == 0)
   }
 
+  test("Velox datasource metrics") {
+    val df = spark.sql(s"SELECT * FROM metrics_t1")
+    val scans = collect(df.queryExecution.executedPlan) {
+      case scan: FileSourceScanExecTransformer => scan
+    }
+    df.collect()
+    assert(scans.length === 1)
+    val metrics = scans.head.metrics
+    assert(metrics("dataSourceReadTime").value > 0)
+    assert(metrics("dataSourceAddSplitTime").value > 0)
+  }
+
   test("test nested loop join metrics") {
     withSQLConf() {
       runQueryAndCompare(
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 543bc31597..11e7ad15de 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       env,
       metricsBuilderClass,
       "<init>",
-      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -585,6 +585,8 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
       longArray[Metrics::kLocalReadBytes],
       longArray[Metrics::kRamReadBytes],
       longArray[Metrics::kPreloadSplits],
+      longArray[Metrics::kDataSourceAddSplitWallNanos],
+      longArray[Metrics::kDataSourceReadWallNanos],
       longArray[Metrics::kPhysicalWrittenBytes],
       longArray[Metrics::kWriteIOTime],
       longArray[Metrics::kNumWrittenFiles],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index cb48a137e7..d3169eb69f 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -80,6 +80,8 @@ struct Metrics {
     kLocalReadBytes,
     kRamReadBytes,
     kPreloadSplits,
+    kDataSourceAddSplitWallNanos,
+    kDataSourceReadWallNanos,
 
     // Write metrics.
     kPhysicalWrittenBytes,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 323231b4fa..039e67bd4f 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -49,6 +49,8 @@ const std::string kStorageReadBytes = "storageReadBytes";
 const std::string kLocalReadBytes = "localReadBytes";
 const std::string kRamReadBytes = "ramReadBytes";
 const std::string kPreloadSplits = "readyPreloadedSplits";
+const std::string kDataSourceAddSplitWallNanos = "dataSourceAddSplitWallNanos";
+const std::string kDataSourceReadWallNanos = "dataSourceReadWallNanos";
 const std::string kNumWrittenFiles = "numWrittenFiles";
 const std::string kWriteIOTime = "writeIOWallNanos";
 
@@ -466,6 +468,10 @@ void WholeStageResultIterator::collectMetrics() {
       metrics_->get(Metrics::kRamReadBytes)[metricIndex] = 
runtimeMetric("sum", second->customStats, kRamReadBytes);
       metrics_->get(Metrics::kPreloadSplits)[metricIndex] =
           runtimeMetric("sum", entry.second->customStats, kPreloadSplits);
+      metrics_->get(Metrics::kDataSourceAddSplitWallNanos)[metricIndex] =
+          runtimeMetric("sum", second->customStats, 
kDataSourceAddSplitWallNanos);
+      metrics_->get(Metrics::kDataSourceReadWallNanos)[metricIndex] =
+          runtimeMetric("sum", second->customStats, kDataSourceReadWallNanos);
       metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
           runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles);
       metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = 
second->physicalWrittenBytes;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to