ShreyeshArangath commented on code in PR #1381:
URL: 
https://github.com/apache/datafusion-python/pull/1381#discussion_r2861995966


##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)

Review Comment:
   +1, I think that would be super helpful. I can extend this to include a new 
user-facing RST page covering things like what metrics are, when they're 
available, how the physical plan tree maps to operators, etc. 



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""
+        return self._raw.sum_by_name(name)
+
+    def __repr__(self) -> str:
+        """Return a string representation of the metrics set."""
+        return repr(self._raw)
+
+
+class Metric:
+    """A single execution metric with name, value, partition, and labels."""
+
+    def __init__(self, raw: df_internal.Metric) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    @property
+    def name(self) -> str:
+        """The name of this metric (e.g. ``output_rows``)."""
+        return self._raw.name
+
+    @property
+    def value(self) -> int | None:
+        """The numeric value of this metric, or None for non-numeric types."""
+        return self._raw.value

Review Comment:
   yes, i think right now it uses a option<usize>, unrecognised variants return 
None; we could possibly return richer types by returning `Option<Py<PyAny>>` as 
you suggested



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""

Review Comment:
   Yes, that's right
   



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")

Review Comment:
   Removed all pytest.skip calls



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+    df = ctx.sql("SELECT * FROM t")
+    plan = df.execution_plan()
+    ms = plan.metrics()
+    assert ms is None or ms.output_rows is None or ms.output_rows == 0
+
+
+def test_collect_partitioned_metrics() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+
+    df.collect_partitioned()
+    plan = df.execution_plan()
+
+    found_metrics = False
+    for _, ms in plan.collect_metrics():
+        if ms.output_rows is not None and ms.output_rows > 0:

Review Comment:
   done!



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""

Review Comment:
   From my understanding it seems to be spill-to-disk events (so no units). 
Updated the doc



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.

Review Comment:
   Updated! `operator_name` was meant to be the node.display() str



##########
python/datafusion/plan.py:
##########
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes:
         Tables created in memory from record batches are currently not 
supported.
         """
         return self._raw_plan.to_proto()
+
+    def metrics(self) -> MetricsSet | None:
+        """Return metrics for this plan node after execution, or None if 
unavailable."""
+        raw = self._raw_plan.metrics()
+        if raw is None:
+            return None
+        return MetricsSet(raw)
+
+    def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
+        """Walk the plan tree and collect metrics from all operators.
+
+        Returns a list of (operator_name, MetricsSet) tuples.
+        """
+        result: list[tuple[str, MetricsSet]] = []
+
+        def _walk(node: ExecutionPlan) -> None:
+            ms = node.metrics()
+            if ms is not None:
+                result.append((node.display(), ms))
+            for child in node.children():
+                _walk(child)
+
+        _walk(self)
+        return result
+
+
+class MetricsSet:
+    """A set of metrics for a single execution plan operator.
+
+    Provides both individual metric access and convenience aggregations
+    across partitions.
+    """
+
+    def __init__(self, raw: df_internal.MetricsSet) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    def metrics(self) -> list[Metric]:
+        """Return all individual metrics in this set."""
+        return [Metric(m) for m in self._raw.metrics()]
+
+    @property
+    def output_rows(self) -> int | None:
+        """Sum of output_rows across all partitions."""
+        return self._raw.output_rows()
+
+    @property
+    def elapsed_compute(self) -> int | None:
+        """Sum of elapsed_compute across all partitions, in nanoseconds."""
+        return self._raw.elapsed_compute()
+
+    @property
+    def spill_count(self) -> int | None:
+        """Sum of spill_count across all partitions."""
+        return self._raw.spill_count()
+
+    @property
+    def spilled_bytes(self) -> int | None:
+        """Sum of spilled_bytes across all partitions."""
+        return self._raw.spilled_bytes()
+
+    @property
+    def spilled_rows(self) -> int | None:
+        """Sum of spilled_rows across all partitions."""
+        return self._raw.spilled_rows()
+
+    def sum_by_name(self, name: str) -> int | None:
+        """Return the sum of metrics matching the given name."""
+        return self._raw.sum_by_name(name)
+
+    def __repr__(self) -> str:
+        """Return a string representation of the metrics set."""
+        return repr(self._raw)
+
+
+class Metric:
+    """A single execution metric with name, value, partition, and labels."""
+
+    def __init__(self, raw: df_internal.Metric) -> None:
+        """This constructor should not be called by the end user."""
+        self._raw = raw
+
+    @property
+    def name(self) -> str:
+        """The name of this metric (e.g. ``output_rows``)."""
+        return self._raw.name
+
+    @property
+    def value(self) -> int | None:
+        """The numeric value of this metric, or None for non-numeric types."""
+        return self._raw.value
+
+    @property
+    def partition(self) -> int | None:
+        """The partition this metric applies to, or None if global."""

Review Comment:
   this is just a 0-based partition id. `partition 0, partition 1` etc. 
basically the parallel slots datafusion splits work across. you’d mostly use it 
to compare the same metric across partitions and spot skew, like when one 
partition is doing way more work than the others.
   
   updated the docstring to call it a “0-based partition index” and clarified 
that None just means the metric isn’t tied to any specific partition.



##########
python/tests/test_plans.py:
##########
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None:
     execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
 
     assert str(original_execution_plan) == str(execution_plan)
+
+
+def test_metrics_tree_walk() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    results = plan.collect_metrics()
+    assert len(results) >= 1
+    found_metrics = False
+    for name, ms in results:
+        assert isinstance(name, str)
+        assert isinstance(ms, MetricsSet)
+        if ms.output_rows is not None and ms.output_rows > 0:
+            found_metrics = True
+    assert found_metrics
+
+
+def test_metric_properties() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+    df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
+    df.collect()
+    plan = df.execution_plan()
+
+    for _, ms in plan.collect_metrics():
+        r = repr(ms)
+        assert isinstance(r, str)
+        for metric in ms.metrics():
+            assert isinstance(metric, Metric)
+            assert isinstance(metric.name, str)
+            assert len(metric.name) > 0
+            assert metric.partition is None or isinstance(metric.partition, 
int)
+            assert isinstance(metric.labels(), dict)
+            mr = repr(metric)
+            assert isinstance(mr, str)
+            assert len(mr) > 0
+            return
+    pytest.skip("No metrics found")
+
+
+def test_no_metrics_before_execution() -> None:
+    ctx = SessionContext()
+    ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
+    df = ctx.sql("SELECT * FROM t")
+    plan = df.execution_plan()
+    ms = plan.metrics()
+    assert ms is None or ms.output_rows is None or ms.output_rows == 0

Review Comment:
   metrics() returns None when the plan node hasn't been executed yet. After 
execution, it returns a MetricsSet which always contains at least the metrics 
the operator recorded. I've updated test_no_metrics_before_execution in to 
assert ms is None directly, rather than the previous looser check, since before 
execution the root node should have no MetricsSet at all
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to