This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bf1e5fda206 add application name as env var to driver and executor
pods for SparkKubernetesOperator (#64015)
bf1e5fda206 is described below
commit bf1e5fda206acde8e3c13e4c4be9c488c65eeb4f
Author: Shrividya Hegde <[email protected]>
AuthorDate: Sun Mar 22 19:10:28 2026 -0400
add application name as env var to driver and executor pods for
SparkKubernetesOperator (#64015)
* add application name as env var to driver and executor pods for
SparkKubernetesOperator
- added spark application name to both driver and executor components in
template body
- custom object launcher already had a value for the env under
spec["item"]. The application name is added along with the k8_spec env var.
- tests added accordingly
-changelog.rst updated.
* reverting the changelog
* simplification of logic
* add application name as env var to driver and executor pods for
SparkKubernetesOperator
- added spark application name to both driver and executor components in
template body
- custom object launcher already had a value for the env under
spec["item"]. The application name is added along with the k8_spec env var.
- tests added accordingly
-changelog.rst updated.
* reverting the changelog
* simplification of logic
* Refactor expected_dict assignment for readability
* Add mock patching to spark application name tests
---
.../kubernetes/operators/custom_object_launcher.py | 3 +-
.../cncf/kubernetes/operators/spark_kubernetes.py | 6 ++
.../kubernetes/operators/test_spark_kubernetes.py | 78 ++++++++++++++++++++--
3 files changed, 82 insertions(+), 5 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
index bbe507716f9..39c9269f154 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
@@ -248,7 +248,8 @@ class CustomObjectLauncher(LoggingMixin):
self.body.spec["imagePullSecrets"] =
k8s_spec.image_pull_secrets
for item in ["driver", "executor"]:
# Env List
- self.body.spec[item]["env"] = k8s_spec.env_vars
+ existing_env = self.body.spec[item].get("env") or []
+ self.body.spec[item]["env"] = existing_env + k8s_spec.env_vars
self.body.spec[item]["envFrom"] = k8s_spec.env_from
# Volumes
self.body.spec[item]["volumeMounts"] = k8s_spec.volume_mounts
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index dfaac1ecd04..0d77361199d 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -396,6 +396,12 @@ class SparkKubernetesOperator(KubernetesPodOperator):
spec_dict[component]["labels"].update(task_context_labels)
+ spec_dict = template_body.setdefault("spark", {}).setdefault("spec",
{})
+ for component in ["driver", "executor"]:
+ env_list = spec_dict.setdefault(component, {}).setdefault("env",
[])
+ if not any(e.get("name") == "SPARK_APPLICATION_NAME" for e in
env_list):
+ env_list.append({"name": "SPARK_APPLICATION_NAME", "value":
self.name})
+
self.log.info("Creating sparkApplication.")
self.launcher = CustomObjectLauncher(
name=self.name,
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
index 56f3132d7d0..b5f278ad49d 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -166,11 +166,12 @@ def
_get_expected_application_dict_with_labels(task_name="default_yaml"):
}
-def
_get_expected_application_dict_without_task_context_labels(task_name="default_yaml"):
+def
_get_expected_application_dict_without_task_context_labels(task_name="default_yaml",
app_name=None):
"""Create expected application dict without task context labels (only
original file labels)."""
original_file_labels = {
"version": "2.4.5",
}
+ app_name = app_name or task_name
return {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
@@ -193,6 +194,7 @@ def
_get_expected_application_dict_without_task_context_labels(task_name="defaul
"labels": original_file_labels.copy(),
"serviceAccount": "spark",
"volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}],
+ "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}],
},
"executor": {
"cores": 1,
@@ -200,6 +202,7 @@ def
_get_expected_application_dict_without_task_context_labels(task_name="defaul
"memory": "512m",
"labels": original_file_labels.copy(),
"volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}],
+ "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}],
},
},
}
@@ -378,7 +381,9 @@ class TestSparkKubernetesOperatorCreateApplication:
assert isinstance(done_op.name, str)
assert done_op.name != ""
- expected_dict =
_get_expected_application_dict_without_task_context_labels(task_name)
+ expected_dict =
_get_expected_application_dict_without_task_context_labels(
+ task_name, app_name=done_op.name
+ )
expected_dict["metadata"]["name"] = done_op.name
mock_create_namespaced_crd.assert_called_with(
body=expected_dict,
@@ -424,7 +429,9 @@ class TestSparkKubernetesOperatorCreateApplication:
else:
assert done_op.name == name_normalized
- expected_dict =
_get_expected_application_dict_without_task_context_labels(task_name)
+ expected_dict =
_get_expected_application_dict_without_task_context_labels(
+ task_name, app_name=done_op.name
+ )
expected_dict["metadata"]["name"] = done_op.name
mock_create_namespaced_crd.assert_called_with(
body=expected_dict,
@@ -467,7 +474,9 @@ class TestSparkKubernetesOperatorCreateApplication:
else:
assert done_op.name == name_normalized
- expected_dict =
_get_expected_application_dict_without_task_context_labels(task_name)
+ expected_dict =
_get_expected_application_dict_without_task_context_labels(
+ task_name, app_name=done_op.name
+ )
expected_dict["metadata"]["name"] = done_op.name
mock_create_namespaced_crd.assert_called_with(
body=expected_dict,
@@ -504,6 +513,8 @@ class TestSparkKubernetesOperatorCreateApplication:
expected_dict = _get_expected_k8s_dict()
expected_dict["metadata"]["name"] = done_op.name
+ expected_dict["spec"]["driver"]["env"] = [{"name":
"SPARK_APPLICATION_NAME", "value": done_op.name}]
+ expected_dict["spec"]["executor"]["env"] = [{"name":
"SPARK_APPLICATION_NAME", "value": done_op.name}]
mock_create_namespaced_crd.assert_called_with(
body=expected_dict,
**self.call_commons,
@@ -540,6 +551,8 @@ class TestSparkKubernetesOperatorCreateApplication:
expected_dict = _get_expected_k8s_dict()
expected_dict["metadata"]["name"] = done_op.name
+ expected_dict["spec"]["driver"]["env"] = [{"name":
"SPARK_APPLICATION_NAME", "value": done_op.name}]
+ expected_dict["spec"]["executor"]["env"] = [{"name":
"SPARK_APPLICATION_NAME", "value": done_op.name}]
mock_create_namespaced_crd.assert_called_with(
body=expected_dict,
**self.call_commons,
@@ -625,9 +638,11 @@ class TestSparkKubernetesOperator:
task_name, mock_create_job_name, job_spec=job_spec,
mock_get_kube_client=mock_get_kube_client
)
assert op.launcher.body["spec"]["driver"]["env"] == [
+ {"name": "SPARK_APPLICATION_NAME", "value": "default_env"},
k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"),
]
assert op.launcher.body["spec"]["executor"]["env"] == [
+ {"name": "SPARK_APPLICATION_NAME", "value": "default_env"},
k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"),
]
@@ -1520,3 +1535,58 @@ class TestSparkKubernetesLifecycle:
# And verify delete works
op.on_kill()
mock_launcher_cls.return_value.delete_spark_job.assert_called()
+
+
@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client")
+ def test_spark_application_name_env_injected(self, mock_client):
+ op = SparkKubernetesOperator(
+ task_id="test_task",
+ namespace="default",
+ template_spec={
+ "apiVersion": "sparkoperator.k8s.io/v1beta2",
+ "kind": "SparkApplication",
+ "spec": {
+ "driver": {},
+ "executor": {},
+ },
+ },
+ reattach_on_restart=False,
+ )
+ op.name = "my-spark-app-abc123"
+
+ with mock.patch.object(op, "get_or_create_spark_crd",
return_value=mock.MagicMock()):
+ op._setup_spark_configuration(mock.MagicMock())
+
+ body = op.launcher.body
+ for component in ["driver", "executor"]:
+ env = body["spec"][component].get("env", [])
+ names = [e["name"] for e in env]
+ assert "SPARK_APPLICATION_NAME" in names
+ value = next(e["value"] for e in env if e["name"] ==
"SPARK_APPLICATION_NAME")
+ assert value == "my-spark-app-abc123"
+
+
@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client")
+ def test_spark_application_name_env_not_duplicated(self, mock_client):
+ op = SparkKubernetesOperator(
+ task_id="test_task",
+ namespace="default",
+ template_spec={
+ "apiVersion": "sparkoperator.k8s.io/v1beta2",
+ "kind": "SparkApplication",
+ "spec": {
+ "driver": {"env": [{"name": "SPARK_APPLICATION_NAME",
"value": "user-defined"}]},
+ "executor": {"env": [{"name": "SPARK_APPLICATION_NAME",
"value": "user-defined"}]},
+ },
+ },
+ reattach_on_restart=False,
+ )
+ op.name = "my-spark-app-abc123"
+
+ with mock.patch.object(op, "get_or_create_spark_crd",
return_value=mock.MagicMock()):
+ op._setup_spark_configuration(mock.MagicMock())
+
+ body = op.launcher.body
+ for component in ["driver", "executor"]:
+ env = body["spec"][component].get("env", [])
+ app_name_envs = [e for e in env if e["name"] ==
"SPARK_APPLICATION_NAME"]
+ assert len(app_name_envs) == 1 # not duplicated
+ assert app_name_envs[0]["value"] == "user-defined"