This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0cb8b98004 Add datasource metrics (#10787)
0cb8b98004 is described below
commit 0cb8b98004964af11c347417d8b3072ef59b3184
Author: Rui Mo <[email protected]>
AuthorDate: Tue Sep 23 18:30:56 2025 +0100
Add datasource metrics (#10787)
Add Velox datasource add-split and read metrics in Gluten.
---
.../main/java/org/apache/gluten/metrics/Metrics.java | 9 +++++++++
.../org/apache/gluten/metrics/OperatorMetrics.java | 6 ++++++
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 18 ++++++++++++++++++
.../gluten/metrics/BatchScanMetricsUpdater.scala | 2 ++
.../gluten/metrics/FileSourceScanMetricsUpdater.scala | 4 ++++
.../gluten/metrics/HiveTableScanMetricsUpdater.scala | 4 ++++
.../scala/org/apache/gluten/metrics/MetricsUtil.scala | 6 ++++++
.../apache/gluten/execution/VeloxMetricsSuite.scala | 12 ++++++++++++
cpp/core/jni/JniWrapper.cc | 4 +++-
cpp/core/utils/Metrics.h | 2 ++
cpp/velox/compute/WholeStageResultIterator.cc | 6 ++++++
11 files changed, 72 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 8d5c92b0cd..d9b16aef98 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -52,6 +52,8 @@ public class Metrics implements IMetrics {
public long[] localReadBytes;
public long[] ramReadBytes;
public long[] preloadSplits;
+ public long[] dataSourceAddSplitTime;
+ public long[] dataSourceReadTime;
public long[] physicalWrittenBytes;
public long[] writeIOTime;
@@ -97,6 +99,8 @@ public class Metrics implements IMetrics {
long[] localReadBytes,
long[] ramReadBytes,
long[] preloadSplits,
+ long[] dataSourceAddSplitTime,
+ long[] dataSourceReadTime,
long[] physicalWrittenBytes,
long[] writeIOTime,
long[] numWrittenFiles,
@@ -135,6 +139,9 @@ public class Metrics implements IMetrics {
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
+ this.dataSourceAddSplitTime = dataSourceAddSplitTime;
+ this.dataSourceReadTime = dataSourceReadTime;
+
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
@@ -180,6 +187,8 @@ public class Metrics implements IMetrics {
localReadBytes[index],
ramReadBytes[index],
preloadSplits[index],
+ dataSourceAddSplitTime[index],
+ dataSourceReadTime[index],
physicalWrittenBytes[index],
writeIOTime[index],
numWrittenFiles[index]);
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 36d827a288..24bedf0a46 100644
---
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -50,6 +50,8 @@ public class OperatorMetrics implements IOperatorMetrics {
public long localReadBytes;
public long ramReadBytes;
public long preloadSplits;
+ public long dataSourceAddSplitTime;
+ public long dataSourceReadTime;
public long physicalWrittenBytes;
public long writeIOTime;
@@ -90,6 +92,8 @@ public class OperatorMetrics implements IOperatorMetrics {
long localReadBytes,
long ramReadBytes,
long preloadSplits,
+ long dataSourceAddSplitTime,
+ long dataSourceReadTime,
long physicalWrittenBytes,
long writeIOTime,
long numWrittenFiles) {
@@ -126,6 +130,8 @@ public class OperatorMetrics implements IOperatorMetrics {
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
+ this.dataSourceAddSplitTime = dataSourceAddSplitTime;
+ this.dataSourceReadTime = dataSourceReadTime;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 901bad8be4..0feefe02bd 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -108,6 +108,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of
preloaded splits"),
+ "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source add split time"),
+ "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
@@ -149,6 +155,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of
preloaded splits"),
+ "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source add split time"),
+ "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
@@ -190,6 +202,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of
processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of
preloaded splits"),
+ "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source add split time"),
+ "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
+ sparkContext,
+ "data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of
processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 31f6b5a313..886e8353b8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -54,6 +54,8 @@ class BatchScanMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metri
metrics("localReadBytes") += operatorMetrics.localReadBytes
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
metrics("preloadSplits") += operatorMetrics.preloadSplits
+ metrics("dataSourceAddSplitTime") +=
operatorMetrics.dataSourceAddSplitTime
+ metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
}
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index f8693f9246..b76525d7fb 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -42,6 +42,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val skippedSplits: SQLMetric = metrics("skippedSplits")
val processedSplits: SQLMetric = metrics("processedSplits")
val preloadSplits: SQLMetric = metrics("preloadSplits")
+ val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
+ val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
@@ -80,6 +82,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
+ dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
+ dataSourceReadTime += operatorMetrics.dataSourceReadTime
}
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
index f7e0dc1800..c8e4e98cdf 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala
@@ -37,6 +37,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric]
val skippedSplits: SQLMetric = metrics("skippedSplits")
val processedSplits: SQLMetric = metrics("processedSplits")
val preloadSplits: SQLMetric = metrics("preloadSplits")
+ val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
+ val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
@@ -75,6 +77,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric]
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
+ dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
+ dataSourceReadTime += operatorMetrics.dataSourceReadTime
}
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index d9e269850b..cd3e0eafa0 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -133,6 +133,8 @@ object MetricsUtil extends Logging {
var localReadBytes: Long = 0
var ramReadBytes: Long = 0
var preloadSplits: Long = 0
+ var dataSourceAddSplitTime: Long = 0
+ var dataSourceReadTime: Long = 0
var numWrittenFiles: Long = 0
val metricsIterator = operatorMetrics.iterator()
@@ -163,6 +165,8 @@ object MetricsUtil extends Logging {
localReadBytes += metrics.localReadBytes
ramReadBytes += metrics.ramReadBytes
preloadSplits += metrics.preloadSplits
+ dataSourceAddSplitTime += metrics.dataSourceAddSplitTime
+ dataSourceReadTime += metrics.dataSourceReadTime
numWrittenFiles += metrics.numWrittenFiles
}
@@ -200,6 +204,8 @@ object MetricsUtil extends Logging {
localReadBytes,
ramReadBytes,
preloadSplits,
+ dataSourceAddSplitTime,
+ dataSourceReadTime,
physicalWrittenBytes,
writeIOTime,
numWrittenFiles
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 954a1eacc5..db26ce2985 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -278,6 +278,18 @@ class VeloxMetricsSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("ramReadBytes").value == 0)
}
+ test("Velox datasource metrics") {
+ val df = spark.sql(s"SELECT * FROM metrics_t1")
+ val scans = collect(df.queryExecution.executedPlan) {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ df.collect()
+ assert(scans.length === 1)
+ val metrics = scans.head.metrics
+ assert(metrics("dataSourceReadTime").value > 0)
+ assert(metrics("dataSourceAddSplitTime").value > 0)
+ }
+
test("test nested loop join metrics") {
withSQLConf() {
runQueryAndCompare(
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 543bc31597..11e7ad15de 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
-
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -585,6 +585,8 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kLocalReadBytes],
longArray[Metrics::kRamReadBytes],
longArray[Metrics::kPreloadSplits],
+ longArray[Metrics::kDataSourceAddSplitWallNanos],
+ longArray[Metrics::kDataSourceReadWallNanos],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
longArray[Metrics::kNumWrittenFiles],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index cb48a137e7..d3169eb69f 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -80,6 +80,8 @@ struct Metrics {
kLocalReadBytes,
kRamReadBytes,
kPreloadSplits,
+ kDataSourceAddSplitWallNanos,
+ kDataSourceReadWallNanos,
// Write metrics.
kPhysicalWrittenBytes,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 323231b4fa..039e67bd4f 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -49,6 +49,8 @@ const std::string kStorageReadBytes = "storageReadBytes";
const std::string kLocalReadBytes = "localReadBytes";
const std::string kRamReadBytes = "ramReadBytes";
const std::string kPreloadSplits = "readyPreloadedSplits";
+const std::string kDataSourceAddSplitWallNanos = "dataSourceAddSplitWallNanos";
+const std::string kDataSourceReadWallNanos = "dataSourceReadWallNanos";
const std::string kNumWrittenFiles = "numWrittenFiles";
const std::string kWriteIOTime = "writeIOWallNanos";
@@ -466,6 +468,10 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kRamReadBytes)[metricIndex] =
runtimeMetric("sum", second->customStats, kRamReadBytes);
metrics_->get(Metrics::kPreloadSplits)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kPreloadSplits);
+ metrics_->get(Metrics::kDataSourceAddSplitWallNanos)[metricIndex] =
+ runtimeMetric("sum", second->customStats,
kDataSourceAddSplitWallNanos);
+ metrics_->get(Metrics::kDataSourceReadWallNanos)[metricIndex] =
+ runtimeMetric("sum", second->customStats, kDataSourceReadWallNanos);
metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles);
metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] =
second->physicalWrittenBytes;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]