This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b64c6bebe98 Add otel integration test group (#62880)
b64c6bebe98 is described below

commit b64c6bebe98c3a79048578ea422ba276f92255a3
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Mar 5 14:02:20 2026 -0800

    Add otel integration test group (#62880)
    
    Add dedicated integration test group for otel
    
    Removed celery from the otel test.
    
    Removed other dead code from the otel test.
    
    Retrieve the spans from jaeger api instead of stdout parsing
---
 airflow-core/tests/integration/otel/test_otel.py   | 1162 ++------------------
 dev/breeze/src/airflow_breeze/global_constants.py  |    4 +-
 dev/breeze/tests/test_selective_checks.py          |    2 +-
 .../src/tests_common/test_utils/otel_utils.py      |   92 --
 scripts/in_container/check_environment.sh          |    3 +
 5 files changed, 94 insertions(+), 1169 deletions(-)

diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index e5ad8181a17..0e4546e301d 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -19,37 +19,29 @@ from __future__ import annotations
 import json
 import logging
 import os
-import signal
 import socket
 import subprocess
 import time
+from typing import Any
 
 import pytest
+import requests
 from sqlalchemy import func, select
 
 from airflow._shared.timezones import timezone
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.dag_processing.dagbag import DagBag
-from airflow.executors import executor_loader
-from airflow.executors.executor_utils import ExecutorName
-from airflow.models import DAG, DagRun
+from airflow.models import DagRun
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.utils.session import create_session
-from airflow.utils.span_status import SpanStatus
 from airflow.utils.state import State
 
 from tests_common.test_utils.dag import create_scheduler_dag
 from tests_common.test_utils.otel_utils import (
-    assert_parent_children_spans,
-    assert_parent_children_spans_for_non_root,
-    assert_span_name_belongs_to_root_span,
-    assert_span_not_in_children_spans,
     dump_airflow_metadata_db,
     extract_metrics_from_output,
-    extract_spans_from_output,
-    get_parent_child_dict,
 )
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS
 
@@ -120,9 +112,7 @@ def unpause_trigger_dag_and_get_run_id(dag_id: str) -> str:
     return run_id
 
 
-def wait_for_dag_run_and_check_span_status(
-    dag_id: str, run_id: str, max_wait_time: int, span_status: str | None
-):
+def wait_for_dag_run(dag_id: str, run_id: str, max_wait_time: int):
     # max_wait_time, is the timeout for the DAG run to complete. The value is 
in seconds.
     start_time = timezone.utcnow().timestamp()
 
@@ -142,479 +132,9 @@ def wait_for_dag_run_and_check_span_status(
             dag_run_state = dag_run.state
             log.debug("DAG Run state: %s.", dag_run_state)
 
-            dag_run_span_status = dag_run.span_status
-            log.debug("DAG Run span status: %s.", dag_run_span_status)
-
             if dag_run_state in [State.SUCCESS, State.FAILED]:
                 break
-
-    assert dag_run_state == State.SUCCESS, (
-        f"Dag run did not complete successfully. Final state: {dag_run_state}."
-    )
-
-    if span_status is not None:
-        assert dag_run_span_status == span_status, (
-            f"Dag run span status isn't {span_status} as expected.Actual 
status: {dag_run_span_status}."
-        )
-
-
-def check_dag_run_state_and_span_status(dag_id: str, run_id: str, state: str, 
span_status: str):
-    with create_session() as session:
-        dag_run = session.scalar(
-            select(DagRun).where(
-                DagRun.dag_id == dag_id,
-                DagRun.run_id == run_id,
-            )
-        )
-
-        assert dag_run is not None
-        assert dag_run.state == state, f"Dag Run state isn't {state}. State: 
{dag_run.state}"
-        assert dag_run.span_status == span_status, (
-            f"Dag Run span_status isn't {span_status}. Span_status: 
{dag_run.span_status}"
-        )
-
-
-def check_ti_state_and_span_status(task_id: str, run_id: str, state: str, 
span_status: str | None):
-    with create_session() as session:
-        ti = session.scalar(
-            select(TaskInstance).where(
-                TaskInstance.task_id == task_id,
-                TaskInstance.run_id == run_id,
-            )
-        )
-
-        assert ti is not None
-        assert ti.state == state, f"Task instance state isn't {state}. State: 
{ti.state}"
-
-        if span_status is not None:
-            assert ti.span_status == span_status, (
-                f"Task instance span_status isn't {span_status}. Span_status: 
{ti.span_status}"
-            )
-
-
-def check_legacy_metrics(output: str, dag: DAG, legacy_metrics_on: bool):
-    # Get a list of lines from the captured output.
-    output_lines = output.splitlines()
-
-    metrics_dict = extract_metrics_from_output(output_lines)
-
-    # Sample of metrics to check:
-    #   new:    airflow.dagrun.dependency-check
-    #   legacy: airflow.dagrun.dependency-check.otel_test_dag
-    #
-    #   new:    airflow.task.scheduled_duration
-    #   legacy: airflow.dag.otel_test_dag.task1.scheduled_duration
-    #   legacy: airflow.dag.otel_test_dag.task2.scheduled_duration
-    #
-    #   new:    airflow.dagrun.duration.success
-    #   legacy: airflow.dagrun.duration.success.otel_test_dag
-
-    legacy_metric_names = [
-        f"airflow.dagrun.dependency-check.{dag.dag_id}",
-        f"airflow.dagrun.duration.success.{dag.dag_id}",
-    ]
-
-    for task_id in dag.task_ids:
-        
legacy_metric_names.append(f"airflow.dag.{dag.dag_id}.{task_id}.scheduled_duration")
-
-    new_metric_names = [
-        "airflow.dagrun.dependency-check",
-        "airflow.dagrun.duration.success",
-        "airflow.task.scheduled_duration",
-    ]
-
-    assert set(new_metric_names).issubset(metrics_dict.keys())
-
-    if legacy_metrics_on:
-        assert set(legacy_metric_names).issubset(metrics_dict.keys())
-
-
-def check_metrics_exist(output: str, metrics_to_check: list[str]):
-    # Get a list of lines from the captured output.
-    output_lines = output.splitlines()
-
-    metrics_dict = extract_metrics_from_output(output_lines)
-
-    assert set(metrics_to_check).issubset(metrics_dict.keys())
-
-
-def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1: 
bool = True):
-    # Get a list of lines from the captured output.
-    output_lines = output.splitlines()
-
-    # Filter the output, create a json obj for each span and then store them 
into dictionaries.
-    # One dictionary with only the root spans, and one with all the captured 
spans (root and otherwise).
-    root_span_dict, span_dict = extract_spans_from_output(output_lines)
-    # Generate a dictionary with parent child relationships.
-    # This is done by comparing the span_id of each root span with the 
parent_id of each non-root span.
-    parent_child_dict = get_parent_child_dict(root_span_dict, span_dict)
-
-    # The span hierarchy for dag 'otel_test_dag_with_pause_in_task' is
-    # dag span
-    #   |_ task1 span
-    #   |_ scheduler_exited span
-    #   |_ new_scheduler span
-    #   |_ dag span (continued)
-    #       |_ task1 span (continued)
-    #           |_ sub_span_1
-    #               |_ sub_span_2
-    #                   |_ sub_span_3
-    #           |_ sub_span_4
-    #       |_ task2 span
-    #
-    # If there is no continuance for task1, then the span hierarchy is
-    # dag span
-    #   |_ task1 span
-    #       |_ sub_span_1
-    #           |_ sub_span_2
-    #               |_ sub_span_3
-    #       |_ sub_span_4
-    #   |_ scheduler_exited span
-    #   |_ new_scheduler span
-    #   |_ dag span (continued)
-    #       |_ task2 span
-    #
-    # Since Airflow 3, there is no direct db access for tasks.
-    # As a result, the sub_spans won't be under the continued span but under 
the initial one.
-    # dag span
-    #   |_ task1 span
-    #       |_ sub_span_1
-    #           |_ sub_span_2
-    #               |_ sub_span_3
-    #       |_ sub_span_4
-    #   |_ scheduler_exited span
-    #   |_ new_scheduler span
-    #   |_ dag span (continued)
-    #       |_ task1 span (continued)
-    #       |_ task2 span
-
-    dag_id = dag.dag_id
-
-    task_instance_ids = dag.task_ids
-    task1_id = task_instance_ids[0]
-    task2_id = task_instance_ids[1]
-
-    dag_root_span_name = f"{dag_id}"
-
-    dag_root_span_children_names = [
-        f"{task1_id}",
-        "current_scheduler_exited",
-        "new_scheduler",
-        f"{dag_id}_continued",
-    ]
-
-    if continuance_for_t1:
-        dag_continued_span_children_names = [
-            f"{task1_id}_continued",
-            f"{task2_id}",
-        ]
-    else:
-        dag_continued_span_children_names = [
-            f"{task2_id}",
-        ]
-
-    task1_span_children_names = [
-        f"{task1_id}_sub_span1",
-        f"{task1_id}_sub_span4",
-    ]
-
-    # Single element lists.
-    task1_sub_span1_children_span_names = [f"{task1_id}_sub_span2"]
-    task1_sub_span2_children_span_names = [f"{task1_id}_sub_span3"]
-
-    assert_span_name_belongs_to_root_span(
-        root_span_dict=root_span_dict, span_name=dag_root_span_name, 
should_succeed=True
-    )
-
-    # Check direct children of the root span.
-    assert_parent_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        parent_name=dag_root_span_name,
-        children_names=dag_root_span_children_names,
-    )
-
-    # Use a span name that exists, but it's not a direct child.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_continued",
-        span_exists=True,
-    )
-
-    # Use a span name that doesn't exist at all.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_non_existent",
-        span_exists=False,
-    )
-
-    # Check children of the continued dag span.
-    assert_parent_children_spans_for_non_root(
-        span_dict=span_dict,
-        parent_name=f"{dag_id}_continued",
-        children_names=dag_continued_span_children_names,
-    )
-
-    if continuance_for_t1 and not AIRFLOW_V_3_0_PLUS:
-        # Check children of the continued task1 span.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}_continued",
-            children_names=task1_span_children_names,
-        )
-    else:
-        # Check children of the task1 span.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}",
-            children_names=task1_span_children_names,
-        )
-
-    # Check children of task1 sub span1.
-    assert_parent_children_spans_for_non_root(
-        span_dict=span_dict,
-        parent_name=f"{task1_id}_sub_span1",
-        children_names=task1_sub_span1_children_span_names,
-    )
-
-    # Check children of task1 sub span2.
-    assert_parent_children_spans_for_non_root(
-        span_dict=span_dict,
-        parent_name=f"{task1_id}_sub_span2",
-        children_names=task1_sub_span2_children_span_names,
-    )
-
-
-def check_spans_without_continuance(
-    output: str, dag: DAG, is_recreated: bool = False, check_t1_sub_spans: 
bool = True
-):
-    recreated_suffix = "_recreated" if is_recreated else ""
-
-    # Get a list of lines from the captured output.
-    output_lines = output.splitlines()
-
-    # Filter the output, create a json obj for each span and then store them 
into dictionaries.
-    # One dictionary with only the root spans, and one with all the captured 
spans (root and otherwise).
-    root_span_dict, span_dict = extract_spans_from_output(output_lines)
-    # Generate a dictionary with parent child relationships.
-    # This is done by comparing the span_id of each root span with the 
parent_id of each non-root span.
-    parent_child_dict = get_parent_child_dict(root_span_dict, span_dict)
-
-    # Any spans generated under a task, are children of the task span.
-    # The span hierarchy for dag 'otel_test_dag' is
-    # dag span
-    #   |_ task1 span
-    #       |_ sub_span_1
-    #           |_ sub_span_2
-    #               |_ sub_span_3
-    #       |_ sub_span_4
-    #   |_ task2 span
-    #
-    # In case task1 has finished running and the span is recreated,
-    # the sub spans are lost and can't be recreated. The span hierarchy will be
-    # dag span
-    #   |_ task1 span
-    #   |_ task2 span
-
-    dag_id = dag.dag_id
-
-    task_instance_ids = dag.task_ids
-    task1_id = task_instance_ids[0]
-    task2_id = task_instance_ids[1]
-
-    # Based on the current tests, only the root span and the task1 span will 
be recreated.
-    # TODO: Adjust accordingly, if there are more tests in the future
-    #  that require other spans to be recreated as well.
-    dag_root_span_name = f"{dag_id}{recreated_suffix}"
-
-    dag_root_span_children_names = [
-        f"{task1_id}{recreated_suffix}",
-        f"{task2_id}",
-    ]
-
-    task1_span_children_names = [
-        f"{task1_id}_sub_span1",
-        f"{task1_id}_sub_span4",
-    ]
-
-    # Single element lists.
-    task1_sub_span1_children_span_names = [f"{task1_id}_sub_span2"]
-    task1_sub_span2_children_span_names = [f"{task1_id}_sub_span3"]
-
-    assert_span_name_belongs_to_root_span(
-        root_span_dict=root_span_dict, span_name=dag_root_span_name, 
should_succeed=True
-    )
-
-    # Check direct children of the root span.
-    assert_parent_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        parent_name=dag_root_span_name,
-        children_names=dag_root_span_children_names,
-    )
-
-    # Use a span name that exists, but it's not a direct child.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_sub_span1",
-        span_exists=True,
-    )
-
-    # Use a span name that doesn't exist at all.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_non_existent",
-        span_exists=False,
-    )
-
-    if check_t1_sub_spans:
-        # Check children of the task1 span.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}{recreated_suffix}",
-            children_names=task1_span_children_names,
-        )
-
-        # Check children of task1 sub span1.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}_sub_span1",
-            children_names=task1_sub_span1_children_span_names,
-        )
-
-        # Check children of task1 sub span2.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}_sub_span2",
-            children_names=task1_sub_span2_children_span_names,
-        )
-
-
-def check_spans_for_paused_dag(
-    output: str, dag: DAG, is_recreated: bool = False, check_t1_sub_spans: 
bool = True
-):
-    recreated_suffix = "_recreated" if is_recreated else ""
-
-    # Get a list of lines from the captured output.
-    output_lines = output.splitlines()
-
-    # Filter the output, create a json obj for each span and then store them 
into dictionaries.
-    # One dictionary with only the root spans, and one with all the captured 
spans (root and otherwise).
-    root_span_dict, span_dict = extract_spans_from_output(output_lines)
-    # Generate a dictionary with parent child relationships.
-    # This is done by comparing the span_id of each root span with the 
parent_id of each non-root span.
-    parent_child_dict = get_parent_child_dict(root_span_dict, span_dict)
-
-    # Any spans generated under a task, are children of the task span.
-    # The span hierarchy for dag 'otel_test_dag_with_pause_between_tasks' is
-    # dag span
-    #   |_ task1 span
-    #       |_ sub_span_1
-    #           |_ sub_span_2
-    #               |_ sub_span_3
-    #       |_ sub_span_4
-    #   |_ paused_task span
-    #   |_ task2 span
-    #
-    # In case task1 has finished running and the span is recreated,
-    # the sub spans are lost and can't be recreated. The span hierarchy will be
-    # dag span
-    #   |_ task1 span
-    #   |_ paused_task span
-    #   |_ task2 span
-
-    dag_id = dag.dag_id
-
-    task_instance_ids = dag.task_ids
-    task1_id = task_instance_ids[0]
-    paused_task_id = task_instance_ids[1]
-    task2_id = task_instance_ids[2]
-
-    # Based on the current tests, only the root span and the task1 span will 
be recreated.
-    # TODO: Adjust accordingly, if there are more tests in the future
-    #  that require other spans to be recreated as well.
-    dag_root_span_name = f"{dag_id}{recreated_suffix}"
-
-    dag_root_span_children_names = [
-        f"{task1_id}{recreated_suffix}",
-        f"{paused_task_id}{recreated_suffix}",
-        f"{task2_id}",
-    ]
-
-    task1_span_children_names = [
-        f"{task1_id}_sub_span1",
-        f"{task1_id}_sub_span4",
-    ]
-
-    # Single element lists.
-    task1_sub_span1_children_span_names = [f"{task1_id}_sub_span2"]
-    task1_sub_span2_children_span_names = [f"{task1_id}_sub_span3"]
-
-    assert_span_name_belongs_to_root_span(
-        root_span_dict=root_span_dict, span_name=dag_root_span_name, 
should_succeed=True
-    )
-
-    # Check direct children of the root span.
-    assert_parent_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        parent_name=dag_root_span_name,
-        children_names=dag_root_span_children_names,
-    )
-
-    # Use a span name that exists, but it's not a direct child.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_sub_span1",
-        span_exists=True,
-    )
-
-    # Use a span name that doesn't exist at all.
-    assert_span_not_in_children_spans(
-        parent_child_dict=parent_child_dict,
-        root_span_dict=root_span_dict,
-        span_dict=span_dict,
-        parent_name=dag_root_span_name,
-        child_name=f"{task1_id}_non_existent",
-        span_exists=False,
-    )
-
-    if check_t1_sub_spans:
-        # Check children of the task1 span.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}{recreated_suffix}",
-            children_names=task1_span_children_names,
-        )
-
-        # Check children of task1 sub span1.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}_sub_span1",
-            children_names=task1_sub_span1_children_span_names,
-        )
-
-        # Check children of task1 sub span2.
-        assert_parent_children_spans_for_non_root(
-            span_dict=span_dict,
-            parent_name=f"{task1_id}_sub_span2",
-            children_names=task1_sub_span2_children_span_names,
-        )
+    return dag_run_state
 
 
 def print_ti_output_for_dag_run(dag_id: str, run_id: str):
@@ -641,7 +161,7 @@ def print_ti_output_for_dag_run(dag_id: str, run_id: str):
                 print("\n===== END =====\n")
 
 
[email protected]("redis")
[email protected]("otel")
 @pytest.mark.backend("postgres")
 class TestOtelIntegration:
     """
@@ -660,24 +180,10 @@ class TestOtelIntegration:
 
     test_dir = os.path.dirname(os.path.abspath(__file__))
     dag_folder = os.path.join(test_dir, "dags")
-    control_file = os.path.join(dag_folder, "dag_control.txt")
-
-    max_wait_seconds_for_pause = 180
 
     use_otel = os.getenv("use_otel", default="false")
     log_level = os.getenv("log_level", default="none")
 
-    celery_command_args = [
-        "celery",
-        "--app",
-        "airflow.providers.celery.executors.celery_executor.app",
-        "worker",
-        "--concurrency",
-        "1",
-        "--loglevel",
-        "INFO",
-    ]
-
     scheduler_command_args = [
         "airflow",
         "scheduler",
@@ -706,8 +212,6 @@ class TestOtelIntegration:
         os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True"
         os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
         os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = 
"http://breeze-otel-collector:4318/v1/traces";
-        if cls.use_otel != "true":
-            os.environ["OTEL_TRACES_EXPORTER"] = "console"
 
         os.environ["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "False"
         os.environ["AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL"] = "2"
@@ -799,28 +303,6 @@ class TestOtelIntegration:
 
         return dag_dict
 
-    @pytest.fixture
-    def celery_worker_env_vars(self, monkeypatch):
-        os.environ["AIRFLOW__CORE__EXECUTOR"] = "CeleryExecutor"
-        executor_name = ExecutorName(
-            
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
-            alias="CeleryExecutor",
-        )
-        monkeypatch.setattr(
-            executor_loader, "_alias_to_executors_per_team", {None: 
{"CeleryExecutor": executor_name}}
-        )
-
-    @pytest.fixture(autouse=True)
-    def cleanup_control_file_if_needed(self):
-        # Don't do anything before yield.
-        # This will run after each test and clean up the control file in case 
of failure.
-        yield
-        try:
-            if os.path.exists(self.control_file):
-                os.remove(self.control_file)
-        except Exception as ex:
-            log.error("Could not delete leftover control file '%s', error: 
'%s'.", self.control_file, ex)
-
     def dag_execution_for_testing_metrics(self, capfd):
         # Metrics.
         os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
@@ -830,13 +312,12 @@ class TestOtelIntegration:
         if self.use_otel != "true":
             os.environ["OTEL_METRICS_EXPORTER"] = "console"
 
-        celery_worker_process = None
         scheduler_process = None
         apiserver_process = None
         try:
             # Start the processes here and not as fixtures or in a common 
setup,
             # so that the test can capture their output.
-            celery_worker_process, scheduler_process, apiserver_process = 
self.start_worker_and_scheduler1()
+            scheduler_process, apiserver_process = 
self.start_worker_and_scheduler()
 
             dag_id = "otel_test_dag"
 
@@ -847,10 +328,8 @@ class TestOtelIntegration:
 
             run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
 
-            # Skip the span_status check.
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=90, 
span_status=None
-            )
+            state = wait_for_dag_run(dag_id=dag_id, run_id=run_id, 
max_wait_time=90)
+            assert state == State.SUCCESS, f"Dag run did not complete 
successfully. Final state: {state}."
 
             # The ti span_status is updated while processing the executor 
events,
             # which is after the dag_run state has been updated.
@@ -860,21 +339,13 @@ class TestOtelIntegration:
             task_dict_ids = task_dict.keys()
 
             for task_id in task_dict_ids:
-                # Skip the span_status check.
-                check_ti_state_and_span_status(
-                    task_id=task_id, run_id=run_id, state=State.SUCCESS, 
span_status=None
-                )
+                ti = self._get_ti(dag_id, run_id, task_id)
+                assert ti is not None
+                assert ti.state == State.SUCCESS
 
             print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
         finally:
             # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            celery_status = celery_worker_process.poll()
-            assert celery_status is not None, (
-                "The celery worker process status is None, which means that it 
hasn't terminated as expected."
-            )
 
             scheduler_process.terminate()
             scheduler_process.wait()
@@ -898,6 +369,17 @@ class TestOtelIntegration:
 
         return out, dag
 
+    def _get_ti(self, dag_id: str, run_id: str, task_id: str) -> Any | None:
+        with create_session() as session:
+            ti = session.scalar(
+                select(TaskInstance).where(
+                    TaskInstance.task_id == task_id,
+                    TaskInstance.dag_id == dag_id,
+                    TaskInstance.run_id == run_id,
+                )
+            )
+        return ti
+
     @pytest.mark.parametrize(
         ("legacy_names_on_bool", "legacy_names_exported"),
         [
@@ -905,22 +387,35 @@ class TestOtelIntegration:
             pytest.param(False, False, id="dont_export_legacy_names"),
         ],
     )
-    def test_export_legacy_metric_names(
-        self, legacy_names_on_bool, legacy_names_exported, monkeypatch, 
celery_worker_env_vars, capfd, session
-    ):
+    def test_export_legacy_metric_names(self, legacy_names_on_bool, 
legacy_names_exported, capfd):
         assert isinstance(legacy_names_on_bool, bool)
         os.environ["AIRFLOW__METRICS__LEGACY_NAMES_ON"] = 
str(legacy_names_on_bool)
-
         out, dag = self.dag_execution_for_testing_metrics(capfd)
-
         if self.use_otel != "true":
-            # Test the metrics from the output.
             assert isinstance(legacy_names_exported, bool)
-            check_legacy_metrics(output=out, dag=dag, 
legacy_metrics_on=legacy_names_exported)
+            output_lines = out.splitlines()
+            metrics_dict = extract_metrics_from_output(output_lines)
+
+            legacy_metric_names = [
+                f"airflow.dagrun.dependency-check.{dag.dag_id}",
+                f"airflow.dagrun.duration.success.{dag.dag_id}",
+            ]
+
+            for task_id in dag.task_ids:
+                
legacy_metric_names.append(f"airflow.dag.{dag.dag_id}.{task_id}.scheduled_duration")
+
+            new_metric_names = [
+                "airflow.dagrun.dependency-check",
+                "airflow.dagrun.duration.success",
+                "airflow.task.scheduled_duration",
+            ]
 
-    def test_export_metrics_during_process_shutdown(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
+            assert set(new_metric_names).issubset(metrics_dict.keys())
+
+            if legacy_names_exported:
+                assert set(legacy_metric_names).issubset(metrics_dict.keys())
+
+    def test_export_metrics_during_process_shutdown(self, capfd):
         out, dag = self.dag_execution_for_testing_metrics(capfd)
 
         if self.use_otel != "true":
@@ -932,92 +427,21 @@ class TestOtelIntegration:
                 "airflow.executor.queued_tasks",
                 "airflow.executor.open_slots",
             ]
-            check_metrics_exist(output=out, metrics_to_check=metrics_to_check)
-
-    @pytest.mark.execution_timeout(90)
-    def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars, 
capfd, session):
-        """The same scheduler will start and finish the dag processing."""
-        celery_worker_process = None
-        scheduler_process = None
-        apiserver_process = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag"
-
-            assert len(self.dags) > 0
-            dag = self.dags[dag_id]
-
-            assert dag is not None
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            # Skip the span_status check.
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=90, 
span_status=None
-            )
-
-            # The ti span_status is updated while processing the executor 
events,
-            # which is after the dag_run state has been updated.
-            time.sleep(10)
-
-            with create_session() as session:
-                task_ids = 
session.scalars(select(TaskInstance.task_id).where(TaskInstance.dag_id == 
dag_id))
-                for task_id in task_ids:
-                    # Skip the span_status check.
-                    check_ti_state_and_span_status(
-                        task_id=task_id, run_id=run_id, state=State.SUCCESS, 
span_status=None
-                    )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            celery_status = celery_worker_process.poll()
-            assert celery_status is not None, (
-                "The celery worker process status is None, which means that it 
hasn't terminated as expected."
-            )
+            output_lines = out.splitlines()
 
-            scheduler_process.terminate()
-            scheduler_process.wait()
-
-            scheduler_status = scheduler_process.poll()
-            assert scheduler_status is not None, (
-                "The scheduler_1 process status is None, which means that it 
hasn't terminated as expected."
-            )
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            apiserver_status = apiserver_process.poll()
-            assert apiserver_status is not None, (
-                "The apiserver process status is None, which means that it 
hasn't terminated as expected."
-            )
+            metrics_dict = extract_metrics_from_output(output_lines)
 
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
+            assert set(metrics_to_check).issubset(metrics_dict.keys())
 
     @pytest.mark.execution_timeout(90)
-    def test_same_scheduler_processing_the_entire_dag(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
+    def test_dag_execution_succeeds(self, capfd):
         """The same scheduler will start and finish the dag processing."""
-        celery_worker_process = None
         scheduler_process = None
         apiserver_process = None
         try:
             # Start the processes here and not as fixtures or in a common 
setup,
             # so that the test can capture their output.
-            celery_worker_process, scheduler_process, apiserver_process = 
self.start_worker_and_scheduler1()
+            scheduler_process, apiserver_process = 
self.start_worker_and_scheduler()
 
             dag_id = "otel_test_dag"
 
@@ -1028,20 +452,13 @@ class TestOtelIntegration:
 
             run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
 
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=90, 
span_status=SpanStatus.ENDED
-            )
+            # Skip the span_status check.
+            wait_for_dag_run(dag_id=dag_id, run_id=run_id, max_wait_time=90)
 
             # The ti span_status is updated while processing the executor 
events,
             # which is after the dag_run state has been updated.
             time.sleep(10)
 
-            with create_session() as session:
-                for ti in 
session.scalars(select(TaskInstance).where(TaskInstance.dag_id == dag.dag_id)):
-                    check_ti_state_and_span_status(
-                        task_id=ti.task_id, run_id=run_id, 
state=State.SUCCESS, span_status=SpanStatus.ENDED
-                    )
-
             print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
         finally:
             if self.log_level == "debug":
@@ -1049,14 +466,6 @@ class TestOtelIntegration:
                     dump_airflow_metadata_db(session)
 
             # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            celery_status = celery_worker_process.poll()
-            assert celery_status is not None, (
-                "The celery worker process status is None, which means that it 
hasn't terminated as expected."
-            )
-
             scheduler_process.terminate()
             scheduler_process.wait()
 
@@ -1077,435 +486,40 @@ class TestOtelIntegration:
         log.info("out-start --\n%s\n-- out-end", out)
         log.info("err-start --\n%s\n-- err-end", err)
 
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans from the output.
-            check_spans_without_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_change_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler thread will be paused after the first task ends and a 
new scheduler process
-        will handle the rest of the dag processing. The paused thread will be 
resumed afterwards.
-        """
-
-        # For this test, scheduler1 must be idle but still considered healthy 
by scheduler2.
-        # If scheduler2 marks the job as unhealthy, then it will recreate 
scheduler1's spans
-        # because it will consider them lost.
-        os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = 
"90"
-
-        celery_worker_process = None
-        scheduler_process_1 = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            with capfd.disabled():
-                # When the scheduler1 thread is paused, capfd keeps trying to 
read the
-                # file descriptors for the process and ends up freezing the 
test.
-                # Temporarily disable capfd to avoid that.
-                scheduler_process_1.send_signal(signal.SIGSTOP)
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Start the 2nd scheduler immediately without any delay to avoid 
having the 1st scheduler
-            # marked as unhealthy. If that happens, then the 2nd will recreate 
the spans that the
-            # 1st scheduler started.
-            # The scheduler would also be considered unhealthy in case it was 
paused
-            # and the dag run continued running.
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.SHOULD_END
-            )
-
-            # Stop scheduler2 in case it still has a db lock on the dag_run.
-            scheduler_process_2.terminate()
-            scheduler_process_1.send_signal(signal.SIGCONT)
-
-            # Wait for the scheduler to start again and continue running.
-            time.sleep(10)
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=30, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Reset for the rest of the tests.
-            os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] 
= "15"
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            scheduler_process_1.terminate()
-            scheduler_process_1.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, 
is_recreated=False, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler that starts the dag run will be stopped, while the first 
task is executing,
-        and start a new scheduler will be started. That way, the new process 
will pick up the dag processing.
-        The initial scheduler will exit gracefully.
-        """
-
-        celery_worker_process = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_in_task"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Terminate scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.terminate()
-
-            assert scheduler_process_1.wait() == 0
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.NEEDS_CONTINUANCE
-            )
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_with_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_forcefully_in_the_middle_of_the_first_task(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The first scheduler will exit forcefully while the first task is 
running,
-        so that it won't have time end any active spans.
-        """
-
-        celery_worker_process = None
-        scheduler_process_2 = None
-        apiserver_process = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_in_task"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Kill scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.send_signal(signal.SIGKILL)
-
-            # The process shouldn't have changed the span_status.
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Wait so that the health threshold passes and scheduler1 is 
considered unhealthy.
-            time.sleep(15)
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Wait for scheduler2 to be up and running.
-            time.sleep(10)
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_without_continuance(output=out, dag=dag, 
is_recreated=True, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_forcefully_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The first scheduler will exit forcefully after the first task finishes,
-        so that it won't have time to end any active spans.
-        In this scenario, the sub-spans for the first task will be lost.
-        The only way to retrieve them, would be to re-run the task.
-        """
-
-        celery_worker_process = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Kill scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.send_signal(signal.SIGKILL)
-
-            # The process shouldn't have changed the span_status.
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            time.sleep(15)
-            # The task should be adopted.
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, is_recreated=True, 
check_t1_sub_spans=False)
-
-    def start_worker_and_scheduler1(self):
-        celery_worker_process = subprocess.Popen(
-            self.celery_command_args,
-            env=os.environ.copy(),
-            stdout=None,
-            stderr=None,
-        )
-
+        # host = "host.docker.internal"
+        host = "jaeger"
+        service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
+        # service_name ``= "my-service-name"
+        r = 
requests.get(f"http://{host}:16686/api/traces?service={service_name}";)
+        data = r.json()
+
+        trace = data["data"][-1]
+        spans = trace["spans"]
+
+        def get_span_hierarchy():
+            spans_dict = {x["spanID"]: x for x in spans}
+
+            def get_parent_span_id(span):
+                parents = [x["spanID"] for x in span["references"] if 
x["refType"] == "CHILD_OF"]
+                if parents:
+                    parent_id = parents[0]
+                    return spans_dict[parent_id]["operationName"]
+
+            nested = {x["operationName"]: get_parent_span_id(x) for x in spans}
+            return nested
+
+        nested = get_span_hierarchy()
+        assert nested == {
+            "otel_test_dag": None,
+            "task1": None,
+            "task1_sub_span1": None,
+            "task1_sub_span2": None,
+            "task1_sub_span3": "task1_sub_span2",
+            "task1_sub_span4": None,
+            "task2": None,
+        }
+
+    def start_worker_and_scheduler(self):
         scheduler_process = subprocess.Popen(
             self.scheduler_command_args,
             env=os.environ.copy(),
@@ -1523,4 +537,4 @@ class TestOtelIntegration:
         # Wait to ensure both processes have started.
         time.sleep(10)
 
-        return celery_worker_process, scheduler_process, apiserver_process
+        return scheduler_process, apiserver_process
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py 
b/dev/breeze/src/airflow_breeze/global_constants.py
index 386928e5265..dd140c02b0a 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -69,7 +69,7 @@ NONE_BACKEND = "none"
 ALLOWED_BACKENDS = [SQLITE_BACKEND, MYSQL_BACKEND, POSTGRES_BACKEND, 
NONE_BACKEND]
 ALLOWED_PROD_BACKENDS = [MYSQL_BACKEND, POSTGRES_BACKEND]
 DEFAULT_BACKEND = ALLOWED_BACKENDS[0]
-TESTABLE_CORE_INTEGRATIONS = ["kerberos", "redis"]
+TESTABLE_CORE_INTEGRATIONS = ["kerberos", "otel", "redis"]
 TESTABLE_PROVIDERS_INTEGRATIONS = [
     "celery",
     "cassandra",
@@ -103,7 +103,7 @@ KEYCLOAK_INTEGRATION = "keycloak"
 STATSD_INTEGRATION = "statsd"
 OTEL_INTEGRATION = "otel"
 OPENLINEAGE_INTEGRATION = "openlineage"
-OTHER_CORE_INTEGRATIONS = [STATSD_INTEGRATION, OTEL_INTEGRATION, 
KEYCLOAK_INTEGRATION]
+OTHER_CORE_INTEGRATIONS = [STATSD_INTEGRATION, KEYCLOAK_INTEGRATION]
 OTHER_PROVIDERS_INTEGRATIONS = [OPENLINEAGE_INTEGRATION]
 ALLOWED_DEBIAN_VERSIONS = ["bookworm"]
 ALL_CORE_INTEGRATIONS = sorted(
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 054c8346a8e..220e8114155 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1125,7 +1125,7 @@ def assert_outputs_are_printed(expected_outputs: 
dict[str, str], stderr: str):
                     "upgrade-to-newer-dependencies": "false",
                     "core-test-types-list-as-strings-in-json": 
ALL_CI_SELECTIVE_TEST_TYPES_AS_JSON,
                     "providers-test-types-list-as-strings-in-json": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES_AS_JSON,
-                    "testable-core-integrations": "['kerberos', 'redis']",
+                    "testable-core-integrations": "['kerberos', 'otel', 
'redis']",
                     "testable-providers-integrations": "['celery', 
'cassandra', 'drill', 'tinkerpop', 'kafka', "
                     "'mongo', 'pinot', 'qdrant', 'redis', 'trino', 'ydb']",
                     "run-mypy": "true",
diff --git a/devel-common/src/tests_common/test_utils/otel_utils.py 
b/devel-common/src/tests_common/test_utils/otel_utils.py
index b93d57467e0..e5406160922 100644
--- a/devel-common/src/tests_common/test_utils/otel_utils.py
+++ b/devel-common/src/tests_common/test_utils/otel_utils.py
@@ -243,95 +243,3 @@ def get_child_list_for_non_root(span_dict: dict, 
span_name: str):
             child_span_list.append(span)
 
     return child_span_list
-
-
-def assert_parent_name_and_get_id(root_span_dict: dict, span_name: str):
-    parent_id = get_id_for_a_given_name(root_span_dict, span_name)
-
-    assert parent_id is not None, f"Parent span '{span_name}' wasn't found."
-
-    return parent_id
-
-
-def assert_span_name_belongs_to_root_span(root_span_dict: dict, span_name: 
str, should_succeed: bool):
-    """Check that a given span name belongs to a root span."""
-    log.info("Checking that '%s' is a root span.", span_name)
-    # Check if any root span has the specified span_name
-    name_exists = any(root_span.get("name", None) == span_name for root_span 
in root_span_dict.values())
-
-    # Assert based on the should_succeed flag
-    if should_succeed:
-        assert name_exists, f"Expected span '{span_name}' to belong to a root 
span, but it does not."
-        log.info("Span '%s' belongs to a root span, as expected.", span_name)
-    else:
-        assert not name_exists, f"Expected span '{span_name}' not to belong to 
a root span, but it does."
-        log.info("Span '%s' doesn't belong to a root span, as expected.", 
span_name)
-
-
-def assert_parent_children_spans(
-    parent_child_dict: dict, root_span_dict: dict, parent_name: str, 
children_names: list[str]
-):
-    """Check that all spans in a given list are children of a given root span 
name."""
-    log.info("Checking that spans '%s' are children of root span '%s'.", 
children_names, parent_name)
-    # Iterate the root_span_dict, to get the span_id for the parent_name.
-    parent_id = assert_parent_name_and_get_id(root_span_dict=root_span_dict, 
span_name=parent_name)
-
-    # Use the root span_id to get the children ids.
-    child_span_list = parent_child_dict[parent_id]
-
-    # For each children id, get the entry from the span_dict.
-    names_from_dict = []
-    for child_span in child_span_list:
-        name = child_span["name"]
-        names_from_dict.append(name)
-
-    # Assert that all given children names match the names from the dictionary.
-    for name in children_names:
-        assert name in names_from_dict, (
-            f"Span name '{name}' wasn't found in children span names. It's not 
a child of span '{parent_name}'."
-        )
-
-
-def assert_parent_children_spans_for_non_root(span_dict: dict, parent_name: 
str, children_names: list[str]):
-    """Check that all spans in a given list are children of a given non-root 
span name."""
-    log.info("Checking that spans '%s' are children of span '%s'.", 
children_names, parent_name)
-    child_span_list = get_child_list_for_non_root(span_dict=span_dict, 
span_name=parent_name)
-
-    # For each children id, get the entry from the span_dict.
-    names_from_dict = []
-    for child_span in child_span_list:
-        name = child_span["name"]
-        names_from_dict.append(name)
-
-    # Assert that all given children names match the names from the dictionary.
-    for name in children_names:
-        assert name in names_from_dict, (
-            f"Span name '{name}' wasn't found in children span names. It's not 
a child of span '{parent_name}'."
-        )
-
-
-def assert_span_not_in_children_spans(
-    parent_child_dict: dict,
-    root_span_dict: dict,
-    span_dict: dict,
-    parent_name: str,
-    child_name: str,
-    span_exists: bool,
-):
-    """Check that a span for a given name, doesn't belong to the children of a 
given root span name."""
-    log.info("Checking that span '%s' is not a child of span '%s'.", 
child_name, parent_name)
-    # Iterate the root_span_dict, to get the span_id for the parent_name.
-    parent_id = assert_parent_name_and_get_id(root_span_dict=root_span_dict, 
span_name=parent_name)
-
-    # Use the root span_id to get the children ids.
-    child_span_id_list = parent_child_dict[parent_id]
-
-    child_id = get_id_for_a_given_name(span_dict=span_dict, 
span_name=child_name)
-
-    if span_exists:
-        assert child_id is not None, f"Span '{child_name}' should exist but it 
doesn't."
-        assert child_id not in child_span_id_list, (
-            f"Span '{child_name}' shouldn't be a child of span 
'{parent_name}', but it is."
-        )
-    else:
-        assert child_id is None, f"Span '{child_name}' shouldn't exist but it 
does."
diff --git a/scripts/in_container/check_environment.sh 
b/scripts/in_container/check_environment.sh
index 77dcb636283..39751c6e797 100755
--- a/scripts/in_container/check_environment.sh
+++ b/scripts/in_container/check_environment.sh
@@ -149,6 +149,9 @@ fi
 if [[ ${INTEGRATION_REDIS} == "true" ]]; then
     check_service "Redis" "run_nc redis 6379" 50
 fi
+if [[ ${INTEGRATION_OTEL} == "true" ]]; then
+    check_service "Jaeger" "run_nc jaeger 16686" 50
+fi
 if [[ ${INTEGRATION_CELERY} == "true" ]]; then
     check_service "Redis" "run_nc redis 6379" 50
     check_service "RabbitMQ" "run_nc rabbitmq 5672" 50

Reply via email to