ashb commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2894645090


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2328,18 +2092,12 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
                     tags={},
                     extra_tags={"dag_id": dag.dag_id},
                 )
-                if span.is_recording():
-                    span.add_event(
-                        name="schedule_delay",
-                        attributes={"dag_id": dag.dag_id, "schedule_delay": 
str(schedule_delay)},
-                    )

Review Comment:
   Is this the sort of thing you mean by
   
   > the detailed task spans will be done in a followup
   
   If so do we have a list anywhere of what we will add back?



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -618,15 +617,6 @@ def emit_metrics(self):
             extra_tags={"hostname": self.job.hostname},
         )
 
-        span = Trace.get_current_span()
-        span.set_attributes(
-            {
-                "trigger host": self.job.hostname,
-                "triggers running": len(self.running_triggers),
-                "capacity left": capacity_left,
-            }
-        )

Review Comment:
   Why was this one removed? Are we going to add it back?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -29,6 +29,9 @@
 
 import structlog
 from natsort import natsorted
+from opentelemetry import context, trace

Review Comment:
   I thought the plan was to import trace from `airflow.observability.traces 
import trace`?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1020,131 +1023,28 @@ def is_effective_leaf(task):
         leaf_tis = {ti for ti in tis if ti.task_id in leaf_task_ids if 
ti.state != TaskInstanceState.REMOVED}
         return leaf_tis
 
-    def set_dagrun_span_attrs(self, span: Span | EmptySpan):
-        if self._state == DagRunState.FAILED:
-            span.set_attribute("airflow.dag_run.error", True)
-
-        # Explicitly set the value type to Union[...] to avoid a mypy error.
-        attributes: dict[str, AttributeValueType] = {
-            "airflow.category": "DAG runs",
-            "airflow.dag_run.dag_id": str(self.dag_id),
-            "airflow.dag_run.logical_date": str(self.logical_date),
-            "airflow.dag_run.run_id": str(self.run_id),
-            "airflow.dag_run.queued_at": str(self.queued_at),
-            "airflow.dag_run.run_start_date": str(self.start_date),
-            "airflow.dag_run.run_end_date": str(self.end_date),
-            "airflow.dag_run.run_duration": str(
-                (self.end_date - self.start_date).total_seconds() if 
self.start_date and self.end_date else 0
-            ),
-            "airflow.dag_run.state": str(self._state),
-            "airflow.dag_run.run_type": str(self.run_type),
-            "airflow.dag_run.data_interval_start": 
str(self.data_interval_start),
-            "airflow.dag_run.data_interval_end": str(self.data_interval_end),
-            "airflow.dag_run.conf": str(self.conf),
-        }
-        if span.is_recording():
-            span.add_event(name="airflow.dag_run.queued", 
timestamp=datetime_to_nano(self.queued_at))
-            span.add_event(name="airflow.dag_run.started", 
timestamp=datetime_to_nano(self.start_date))
-            span.add_event(name="airflow.dag_run.ended", 
timestamp=datetime_to_nano(self.end_date))
-        span.set_attributes(attributes)
-
-    def start_dr_spans_if_needed(self, tis: list[TI]):
-        # If there is no value in active_spans, then the span hasn't already 
been started.
-        if self.active_spans is not None and self.active_spans.get("dr:" + 
str(self.id)) is None:
-            if self.span_status == SpanStatus.NOT_STARTED or self.span_status 
== SpanStatus.NEEDS_CONTINUANCE:
-                dr_span = None
-                continue_ti_spans = False
-                if self.span_status == SpanStatus.NOT_STARTED:
-                    dr_span = Trace.start_root_span(
-                        span_name=f"{self.dag_id}",
-                        component="dag",
-                        start_time=self.queued_at,  # This is later converted 
to nano.
-                        start_as_current=False,
-                    )
-                elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                    # Use the existing context_carrier to set the initial 
dag_run span as the parent.
-                    parent_context = Trace.extract(self.context_carrier)
-                    with Trace.start_child_span(
-                        span_name="new_scheduler", 
parent_context=parent_context
-                    ) as s:
-                        s.set_attribute("trace_status", "continued")
-
-                    dr_span = Trace.start_child_span(
-                        span_name=f"{self.dag_id}_continued",
-                        parent_context=parent_context,
-                        component="dag",
-                        # No start time
-                        start_as_current=False,
-                    )
-                    # After this span is started, the context_carrier will be 
replaced by the new one.
-                    # New task span will use this span as the parent.
-                    continue_ti_spans = True
-                carrier = Trace.inject()
-                self.context_carrier = carrier
-                self.span_status = SpanStatus.ACTIVE
-                # Set the span in a synchronized dictionary, so that the 
variable can be used to end the span.
-                self.active_spans.set("dr:" + str(self.id), dr_span)
-                self.log.debug(
-                    "DagRun span has been started and the injected 
context_carrier is: %s",
-                    self.context_carrier,
-                )
-                # Start TI spans that also need continuance.
-                if continue_ti_spans:
-                    new_dagrun_context = Trace.extract(self.context_carrier)
-                    for ti in tis:
-                        if ti.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                            ti_span = Trace.start_child_span(
-                                span_name=f"{ti.task_id}_continued",
-                                parent_context=new_dagrun_context,
-                                start_as_current=False,
-                            )
-                            ti_carrier = Trace.inject()
-                            ti.context_carrier = ti_carrier
-                            ti.span_status = SpanStatus.ACTIVE
-                            self.active_spans.set(f"ti:{ti.id}", ti_span)
-            else:
-                self.log.debug(
-                    "Found span_status '%s', while updating state for dag_run 
'%s'",
-                    self.span_status,
-                    self.run_id,
-                )
-
-    def end_dr_span_if_needed(self):
-        if self.active_spans is not None:
-            active_span = self.active_spans.get("dr:" + str(self.id))
-            if active_span is not None:
-                self.log.debug(
-                    "Found active span with span_id: %s, for dag_id: %s, 
run_id: %s, state: %s",
-                    active_span.get_span_context().span_id,
-                    self.dag_id,
-                    self.run_id,
-                    self.state,
-                )
-
-                self.set_dagrun_span_attrs(span=active_span)
-                active_span.end(end_time=datetime_to_nano(self.end_date))
-                # Remove the span from the dict.
-                self.active_spans.delete("dr:" + str(self.id))
-                self.span_status = SpanStatus.ENDED
-            else:
-                if self.span_status == SpanStatus.ACTIVE:
-                    # Another scheduler has started the span.
-                    # Update the DB SpanStatus to notify the owner to end it.
-                    self.span_status = SpanStatus.SHOULD_END
-                elif self.span_status == SpanStatus.NEEDS_CONTINUANCE:
-                    # This is a corner case where the scheduler exited 
gracefully
-                    # while the dag_run was almost done.
-                    # Since it reached this point, the dag has finished but 
there has been no time
-                    # to create a new span for the current scheduler.
-                    # There is no need for more spans, update the status on 
the db.
-                    self.span_status = SpanStatus.ENDED
-                else:
-                    self.log.debug(
-                        "No active span has been found for dag_id: %s, run_id: 
%s, state: %s",
-                        self.dag_id,
-                        self.run_id,
-                        self.state,
-                    )
+    def _emit_dagrun_span(self, state: DagRunState):
+        ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
+        span = trace.get_current_span(context=ctx)
+        span_context = span.get_span_context()
+        with override_ids(span_context.trace_id, span_context.span_id):
+            attributes = {
+                "dag_id": str(self.dag_id),
+                "run_id": self.run_id,
+            }
+            if self.logical_date:
+                attributes["logical_date"] = str(self.logical_date)

Review Comment:
   Previously these attributes were prefixed with `airflow.` (I thik) -- 
shouldn't that stay?



##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -510,16 +508,12 @@ def get_parent_span_id(span):
 
         nested = get_span_hierarchy()
         assert nested == {
-            "otel_test_dag": None,
-            "task1": None,
-            "task1_sub_span1": None,
-            "task1_sub_span2": None,
-            "task1_sub_span3": "task1_sub_span2",
-            "task1_sub_span4": None,
-            "task2": None,
+            "sub_span1": "task_run.task1",
+            "task_run.task1": "dag_run.otel_test_dag",
+            "dag_run.otel_test_dag": None,

Review Comment:
   Can you explain this diff? I'm sure it's fine, I just can't quite grok the 
reason (or even the before or after values)



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -3281,3 +3144,168 @@ def on_failure(context):
         assert context_received["ti"].task_id == "test_task"
         assert context_received["ti"].dag_id == "test_dag"
         assert context_received["ti"].run_id == dr.run_id
+
+
+class TestDagRunTracing:
+    """Tests for DagRun OpenTelemetry span behavior."""
+
+    @pytest.fixture(autouse=True)
+    def sdk_tracer_provider(self):
+        """Patch the module-level tracer with one backed by a real SDK 
provider so spans have valid IDs."""
+        provider = TracerProvider()
+        real_tracer = provider.get_tracer("airflow.models.dagrun")
+        with mock.patch("airflow.models.dagrun.tracer", real_tracer):
+            yield
+
+    def test_context_carrier_set_on_init(self, dag_maker):
+        """DagRun.__init__ should populate context_carrier with a W3C 
traceparent."""
+        with dag_maker("test_tracing_init"):
+            EmptyOperator(task_id="t1")
+        dr = dag_maker.create_dagrun()
+
+        assert dr.context_carrier is not None
+        assert isinstance(dr.context_carrier, dict)
+        # W3C Trace Context propagation injects a "traceparent" key
+        assert "traceparent" in dr.context_carrier
+        # traceparent format: 00-<32 hex trace_id>-<16 hex span_id>-<2 hex 
flags>
+        traceparent = dr.context_carrier["traceparent"]
+        parts = traceparent.split("-")
+        assert len(parts) == 4
+        assert parts[0] == "00"
+        assert len(parts[1]) == 32  # trace_id
+        assert len(parts[2]) == 16  # span_id
+        assert len(parts[3]) == 2  # flags

Review Comment:
   This feels like an internal impl detail of otel, and likely something we 
shouldn't be asserting about?
   
   ```suggestion
           assert dr.context_carrier["traceparent"], "traceparent is set on the 
span"
   ```



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -133,6 +134,32 @@
     from airflow.sdk.exceptions import DagRunTriggerException
     from airflow.sdk.types import OutletEventAccessorsProtocol
 
+log = structlog.get_logger("task")
+
+tracer = trace.get_tracer(__name__)
+
+
+@contextmanager
+def _make_task_span(msg: StartupDetails):
+    parent_context = (
+        TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if 
msg.ti.context_carrier else None
+    )
+    ti = msg.ti
+    span_name = f"task_run.{ti.task_id}"

Review Comment:
   ```suggestion
       span_name = f"airflow.task_run.{ti.task_id}"
   ```
   
   etc?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1340,10 +1237,8 @@ def recalculate(self) -> _UnfinishedStates:
                 self.data_interval_start,
                 self.data_interval_end,
             )
-
-            self.end_dr_span_if_needed()
-
             session.flush()
+            self._emit_dagrun_span(state=self._state)

Review Comment:
   Nit: shouldn't this be 
   
   ```suggestion
               self._emit_dagrun_span(state=self.state)
   ```
   
   ?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1796,6 +1816,17 @@ def finalize(
         log.exception("error calling listener")
 
 
+@contextmanager
+def flush_spans():
+    try:
+        yield
+    finally:
+        provider = trace.get_tracer_provider()
+        if hasattr(provider, "force_flush"):
+            provider.force_flush(timeout_millis=30000)

Review Comment:
   Make it configurable?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -369,7 +368,15 @@ def __init__(
         self.triggered_by = triggered_by
         self.triggering_user_name = triggering_user_name
         self.scheduled_by_job_id = None
-        self.context_carrier = {}
+        self.context_carrier: dict[str, str] = {}
+
+        # We never call .end() on this span. It's solely used to generate a 
trace context for the rest of the run.
+        empty_context = context.Context()
+        span = tracer.start_span("notused", context=empty_context)

Review Comment:
   Just checking "notused" never appears in the submitted traces/spans, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to