SameerMesiah97 commented on code in PR #63020:
URL: https://github.com/apache/airflow/pull/63020#discussion_r2900993795
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
config_file.write(config_text)
config_file.flush()
yield config_file.name
+
+ def generate_config_dict_for_deferral(
+ self,
+ eks_cluster_name: str,
+ pod_namespace: str | None,
+ ) -> dict:
+ """
+ Generate a kubeconfig dict with embedded token for use in deferrable
mode.
+
+ This method generates a kubeconfig that uses a pre-fetched bearer
token instead of
+ an exec credential plugin. This is necessary for deferrable mode
because:
+ 1. The exec plugin references temp files that only exist on the worker
+ 2. The triggerer runs on a different host where those temp files don't
exist
+ 3. By embedding the token directly, the config can be serialized and
used anywhere
+
+ Note: The token has a limited lifetime (typically 14 minutes). The
triggerer should
+ complete its work within this window, or the trigger_reentry will
fetch fresh credentials.
+
+ :param eks_cluster_name: The name of the cluster to generate
kubeconfig for.
+ :param pod_namespace: The namespace to run within kubernetes.
+ :return: A kubeconfig dict with embedded bearer token.
+ """
+ from botocore.exceptions import ClientError
+
+ from airflow.providers.amazon.aws.utils.eks_get_token import
fetch_access_token_for_cluster
+
+ # Get cluster details
+ eks_client = self.conn
+ session = self.get_session()
+
+ try:
+ cluster = eks_client.describe_cluster(name=eks_cluster_name)
+ except ClientError as e:
+ raise ValueError(
+ f"Failed to describe EKS cluster '{eks_cluster_name}':
{e.response['Error']['Message']}"
+ ) from e
+
+ cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+ cluster_ep = cluster["cluster"]["endpoint"]
+
+ # Generate the STS URL for token generation
+ os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+ try:
+ sts_url =
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+ finally:
+ del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+
Review Comment:
Modifying environment variables is not acceptable here. `os.environ` is
process-global so setting and deleting `AWS_STS_REGIONAL_ENDPOINTS` like this
could interfere with other tasks running in the same worker process. It could
also remove a value that was already set by the environment.
I am not sure why you need to manipulate environment variables because my
understanding is that the url construction would default to 'regional' without
explicitly setting `AWS_STS_REGIONAL_ENDPOINTS` to regional.
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
config_file.write(config_text)
config_file.flush()
yield config_file.name
+
+ def generate_config_dict_for_deferral(
+ self,
+ eks_cluster_name: str,
+ pod_namespace: str | None,
+ ) -> dict:
+ """
+ Generate a kubeconfig dict with embedded token for use in deferrable
mode.
+
+ This method generates a kubeconfig that uses a pre-fetched bearer
token instead of
+ an exec credential plugin. This is necessary for deferrable mode
because:
+ 1. The exec plugin references temp files that only exist on the worker
+ 2. The triggerer runs on a different host where those temp files don't
exist
+ 3. By embedding the token directly, the config can be serialized and
used anywhere
+
+ Note: The token has a limited lifetime (typically 14 minutes). The
triggerer should
+ complete its work within this window, or the trigger_reentry will
fetch fresh credentials.
+
+ :param eks_cluster_name: The name of the cluster to generate
kubeconfig for.
+ :param pod_namespace: The namespace to run within kubernetes.
+ :return: A kubeconfig dict with embedded bearer token.
+ """
+ from botocore.exceptions import ClientError
+
+ from airflow.providers.amazon.aws.utils.eks_get_token import
fetch_access_token_for_cluster
+
+ # Get cluster details
+ eks_client = self.conn
+ session = self.get_session()
+
+ try:
+ cluster = eks_client.describe_cluster(name=eks_cluster_name)
+ except ClientError as e:
+ raise ValueError(
+ f"Failed to describe EKS cluster '{eks_cluster_name}':
{e.response['Error']['Message']}"
+ ) from e
+
+ cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+ cluster_ep = cluster["cluster"]["endpoint"]
+
+ # Generate the STS URL for token generation
+ os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+ try:
+ sts_url =
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+ finally:
+ del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+
+ # Fetch the access token directly
+ try:
+ access_token = fetch_access_token_for_cluster(
+ eks_cluster_name=eks_cluster_name,
+ sts_url=sts_url,
+ region_name=session.region_name,
+ )
+ except Exception as e:
Review Comment:
The `Exception` here is too broad. Can you perhaps narrow it to the errors
you would expect when invoking `fetch_access_token_for_cluster` ?
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
config_file.write(config_text)
config_file.flush()
yield config_file.name
+
+ def generate_config_dict_for_deferral(
+ self,
+ eks_cluster_name: str,
+ pod_namespace: str | None,
+ ) -> dict:
+ """
+ Generate a kubeconfig dict with embedded token for use in deferrable
mode.
+
+ This method generates a kubeconfig that uses a pre-fetched bearer
token instead of
+ an exec credential plugin. This is necessary for deferrable mode
because:
+ 1. The exec plugin references temp files that only exist on the worker
+ 2. The triggerer runs on a different host where those temp files don't
exist
+ 3. By embedding the token directly, the config can be serialized and
used anywhere
+
+ Note: The token has a limited lifetime (typically 14 minutes). The
triggerer should
+ complete its work within this window, or the trigger_reentry will
fetch fresh credentials.
+
+ :param eks_cluster_name: The name of the cluster to generate
kubeconfig for.
+ :param pod_namespace: The namespace to run within kubernetes.
+ :return: A kubeconfig dict with embedded bearer token.
Review Comment:
This docstring is too long. The emphasis should be on description not
justification. Something like this would be better:
```
"""
Generate a kubeconfig dict with an embedded bearer token for deferrable
execution.
The token-based config avoids the exec credential plugin so it can be safely
serialized and used by the triggerer process.
:param eks_cluster_name: The name of the EKS cluster.
:param pod_namespace: The Kubernetes namespace.
:return: Kubeconfig dictionary with embedded bearer token.
"""
```
The additional content where you explain why this function exists might be
better as a comment.
##########
providers/amazon/tests/unit/amazon/aws/hooks/test_eks.py:
##########
@@ -1273,6 +1273,126 @@ def test_generate_config_file(self, mock_conn,
aws_conn_id, region_name, expecte
if expected_region_args:
assert expected_region_args in command_arg
+ @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
+
@mock.patch("airflow.providers.amazon.aws.utils.eks_get_token.fetch_access_token_for_cluster")
+ def test_generate_config_dict_for_deferral(self, mock_fetch_token,
mock_conn):
+ """Test that generate_config_dict_for_deferral creates a config with
embedded token.
+
+ This test verifies that the method generates a kubeconfig dict with a
bearer token
+ embedded directly (instead of an exec block that references temp
files), allowing
+ the config to be serialized and used on the triggerer.
Review Comment:
Remove everything from this docstring except the first line.
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
) as self.config_file:
return super().trigger_reentry(context, event)
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
+ """
+ Override to generate a token-based kubeconfig for the triggerer.
+
+ The base KubernetesPodOperator.invoke_defer_method() calls
convert_config_file_to_dict()
+ which reads the kubeconfig file into a dict. For EKS, this kubeconfig
contains an exec
+ block that references a temp file with AWS credentials. This temp file
only exists on
+ the worker and is deleted when the context managers exit.
+
+ When the trigger is serialized and sent to the triggerer (which runs
on a different host),
+ the exec block tries to source a file that doesn't exist, causing 401
Unauthorized errors.
+
+ This override generates a kubeconfig with an embedded bearer token
instead of an exec
+ block, allowing the config to work on the triggerer without requiring
local temp files.
+ """
Review Comment:
This docstring is too long as well. I would suggest the below:
```
"""
Override to generate a token-based kubeconfig for the triggerer.
EKS kubeconfigs use an exec credential plugin that references temporary
files created on the worker. These files are not available on the triggerer,
so this override embeds a bearer token instead.
"""
```
Same as above, I would leave the truncated content to be included in a
comment instead.
##########
providers/amazon/tests/unit/amazon/aws/operators/test_eks.py:
##########
@@ -1116,3 +1116,92 @@ def
test_refresh_cached_properties_raises_when_no_credentials(
# Verify super()._refresh_cached_properties() was NOT called since we
raised
mock_super_refresh.assert_not_called()
+
+
@mock.patch("airflow.providers.amazon.aws.operators.eks.EksPodOperator.defer")
+
@mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.generate_config_dict_for_deferral")
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__",
return_value=None)
+ def test_invoke_defer_method_generates_token_based_config(
+ self,
+ mock_eks_hook,
+ mock_generate_config_dict,
+ mock_defer,
+ ):
+ """Test that invoke_defer_method generates a token-based config dict
for the triggerer.
+
+ This test verifies that EksPodOperator.invoke_defer_method() generates
a kubeconfig
+ with an embedded bearer token (instead of an exec block with temp file
references)
+ so that the triggerer can authenticate without requiring files that
only exist on the worker.
+ """
Review Comment:
Remove everything from this docstring except the first line.
##########
providers/amazon/newsfragments/61736.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix EksPodOperator deferrable mode failing on remote triggerers with 401
Unauthorized by embedding bearer token in kubeconfig instead of using exec
block with temp file references
Review Comment:
I am not sure if a news fragment is necessary for this but let's see what a
committer/maintainer has to say.
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
) as self.config_file:
return super().trigger_reentry(context, event)
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
+ """
+ Override to generate a token-based kubeconfig for the triggerer.
+
+ The base KubernetesPodOperator.invoke_defer_method() calls
convert_config_file_to_dict()
+ which reads the kubeconfig file into a dict. For EKS, this kubeconfig
contains an exec
+ block that references a temp file with AWS credentials. This temp file
only exists on
+ the worker and is deleted when the context managers exit.
+
+ When the trigger is serialized and sent to the triggerer (which runs
on a different host),
+ the exec block tries to source a file that doesn't exist, causing 401
Unauthorized errors.
+
+ This override generates a kubeconfig with an embedded bearer token
instead of an exec
+ block, allowing the config to work on the triggerer without requiring
local temp files.
+ """
+ eks_hook = EksHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region,
+ )
+
+ # Generate a kubeconfig dict with an embedded token (no exec block)
+ self._config_dict = eks_hook.generate_config_dict_for_deferral(
+ eks_cluster_name=self.cluster_name,
+ pod_namespace=self.namespace,
+ )
+
+ # Now call the parent's invoke_defer_method, but skip
convert_config_file_to_dict
+ # since we've already set self._config_dict
+ # We need to replicate the parent logic but use our config_dict
+ import datetime
+
+ from airflow.providers.cncf.kubernetes.triggers.pod import
ContainerState, KubernetesPodTrigger
+ from airflow.providers.common.compat.sdk import
AirflowNotFoundException, BaseHook
Review Comment:
Why are these imports within the function? Not necessarily an issue but can
you explain why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]