SameerMesiah97 commented on code in PR #63020:
URL: https://github.com/apache/airflow/pull/63020#discussion_r2900993795


##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.
+        """
+        from botocore.exceptions import ClientError
+
+        from airflow.providers.amazon.aws.utils.eks_get_token import 
fetch_access_token_for_cluster
+
+        # Get cluster details
+        eks_client = self.conn
+        session = self.get_session()
+
+        try:
+            cluster = eks_client.describe_cluster(name=eks_cluster_name)
+        except ClientError as e:
+            raise ValueError(
+                f"Failed to describe EKS cluster '{eks_cluster_name}': 
{e.response['Error']['Message']}"
+            ) from e
+
+        cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+        cluster_ep = cluster["cluster"]["endpoint"]
+
+        # Generate the STS URL for token generation
+        os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+        try:
+            sts_url = 
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+        finally:
+            del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+

Review Comment:
   Modifying environment variables is not acceptable here. `os.environ` is 
process-global so setting and deleting `AWS_STS_REGIONAL_ENDPOINTS` like this 
could interfere with other tasks running in the same worker process. It could 
also remove a value that was already set by the environment.
   
   I am not sure why you need to manipulate environment variables because my 
understanding is that the url construction would default to 'regional' without 
explicitly setting `AWS_STS_REGIONAL_ENDPOINTS` to regional.



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.
+        """
+        from botocore.exceptions import ClientError
+
+        from airflow.providers.amazon.aws.utils.eks_get_token import 
fetch_access_token_for_cluster
+
+        # Get cluster details
+        eks_client = self.conn
+        session = self.get_session()
+
+        try:
+            cluster = eks_client.describe_cluster(name=eks_cluster_name)
+        except ClientError as e:
+            raise ValueError(
+                f"Failed to describe EKS cluster '{eks_cluster_name}': 
{e.response['Error']['Message']}"
+            ) from e
+
+        cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+        cluster_ep = cluster["cluster"]["endpoint"]
+
+        # Generate the STS URL for token generation
+        os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+        try:
+            sts_url = 
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+        finally:
+            del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+
+        # Fetch the access token directly
+        try:
+            access_token = fetch_access_token_for_cluster(
+                eks_cluster_name=eks_cluster_name,
+                sts_url=sts_url,
+                region_name=session.region_name,
+            )
+        except Exception as e:

Review Comment:
   The `Exception` here is too broad. Can you perhaps narrow it to the errors 
you would expect when invoking `fetch_access_token_for_cluster` ?



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.

Review Comment:
   This docstring is too long. The emphasis should be on description not 
justification. Something like this would be better:
   
   ```
   """
   Generate a kubeconfig dict with an embedded bearer token for deferrable 
execution.
   
   The token-based config avoids the exec credential plugin so it can be safely
   serialized and used by the triggerer process.
   
   :param eks_cluster_name: The name of the EKS cluster.
   :param pod_namespace: The Kubernetes namespace.
   :return: Kubeconfig dictionary with embedded bearer token.
   """
   ```
   The additional content where you explain why this function exists might be 
better as a comment.



##########
providers/amazon/tests/unit/amazon/aws/hooks/test_eks.py:
##########
@@ -1273,6 +1273,126 @@ def test_generate_config_file(self, mock_conn, 
aws_conn_id, region_name, expecte
             if expected_region_args:
                 assert expected_region_args in command_arg
 
+    @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
+    
@mock.patch("airflow.providers.amazon.aws.utils.eks_get_token.fetch_access_token_for_cluster")
+    def test_generate_config_dict_for_deferral(self, mock_fetch_token, 
mock_conn):
+        """Test that generate_config_dict_for_deferral creates a config with 
embedded token.
+
+        This test verifies that the method generates a kubeconfig dict with a 
bearer token
+        embedded directly (instead of an exec block that references temp 
files), allowing
+        the config to be serialized and used on the triggerer.

Review Comment:
   Remove everything from this docstring except the first line.



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event: 
dict[str, Any]) -> Any:
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
 
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
+        """
+        Override to generate a token-based kubeconfig for the triggerer.
+
+        The base KubernetesPodOperator.invoke_defer_method() calls 
convert_config_file_to_dict()
+        which reads the kubeconfig file into a dict. For EKS, this kubeconfig 
contains an exec
+        block that references a temp file with AWS credentials. This temp file 
only exists on
+        the worker and is deleted when the context managers exit.
+
+        When the trigger is serialized and sent to the triggerer (which runs 
on a different host),
+        the exec block tries to source a file that doesn't exist, causing 401 
Unauthorized errors.
+
+        This override generates a kubeconfig with an embedded bearer token 
instead of an exec
+        block, allowing the config to work on the triggerer without requiring 
local temp files.
+        """

Review Comment:
   This docstring is too long as well. I would suggest the below:
   ```
   """
   Override to generate a token-based kubeconfig for the triggerer.
   
   EKS kubeconfigs use an exec credential plugin that references temporary
   files created on the worker. These files are not available on the triggerer,
   so this override embeds a bearer token instead.
   """
   ```
   
   Same as above, I would leave the truncated content to be included in a 
comment instead. 



##########
providers/amazon/tests/unit/amazon/aws/operators/test_eks.py:
##########
@@ -1116,3 +1116,92 @@ def 
test_refresh_cached_properties_raises_when_no_credentials(
 
         # Verify super()._refresh_cached_properties() was NOT called since we 
raised
         mock_super_refresh.assert_not_called()
+
+    
@mock.patch("airflow.providers.amazon.aws.operators.eks.EksPodOperator.defer")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.generate_config_dict_for_deferral")
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__", 
return_value=None)
+    def test_invoke_defer_method_generates_token_based_config(
+        self,
+        mock_eks_hook,
+        mock_generate_config_dict,
+        mock_defer,
+    ):
+        """Test that invoke_defer_method generates a token-based config dict 
for the triggerer.
+
+        This test verifies that EksPodOperator.invoke_defer_method() generates 
a kubeconfig
+        with an embedded bearer token (instead of an exec block with temp file 
references)
+        so that the triggerer can authenticate without requiring files that 
only exist on the worker.
+        """

Review Comment:
   Remove everything from this docstring except the first line.



##########
providers/amazon/newsfragments/61736.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix EksPodOperator deferrable mode failing on remote triggerers with 401 
Unauthorized by embedding bearer token in kubeconfig instead of using exec 
block with temp file references

Review Comment:
   I am not sure if a news fragment is necessary for this but let's see what a 
committer/maintainer has to say.



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event: 
dict[str, Any]) -> Any:
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
 
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
+        """
+        Override to generate a token-based kubeconfig for the triggerer.
+
+        The base KubernetesPodOperator.invoke_defer_method() calls 
convert_config_file_to_dict()
+        which reads the kubeconfig file into a dict. For EKS, this kubeconfig 
contains an exec
+        block that references a temp file with AWS credentials. This temp file 
only exists on
+        the worker and is deleted when the context managers exit.
+
+        When the trigger is serialized and sent to the triggerer (which runs 
on a different host),
+        the exec block tries to source a file that doesn't exist, causing 401 
Unauthorized errors.
+
+        This override generates a kubeconfig with an embedded bearer token 
instead of an exec
+        block, allowing the config to work on the triggerer without requiring 
local temp files.
+        """
+        eks_hook = EksHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region,
+        )
+
+        # Generate a kubeconfig dict with an embedded token (no exec block)
+        self._config_dict = eks_hook.generate_config_dict_for_deferral(
+            eks_cluster_name=self.cluster_name,
+            pod_namespace=self.namespace,
+        )
+
+        # Now call the parent's invoke_defer_method, but skip 
convert_config_file_to_dict
+        # since we've already set self._config_dict
+        # We need to replicate the parent logic but use our config_dict
+        import datetime
+
+        from airflow.providers.cncf.kubernetes.triggers.pod import 
ContainerState, KubernetesPodTrigger
+        from airflow.providers.common.compat.sdk import 
AirflowNotFoundException, BaseHook

Review Comment:
   Why are these imports within the function? Not necessarily an issue but can 
you explain why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to