Copilot commented on code in PR #62696:
URL: https://github.com/apache/airflow/pull/62696#discussion_r2973215166


##########
task-sdk/src/airflow/sdk/providers_manager_runtime.py:
##########
@@ -218,6 +220,22 @@ def initialize_providers_taskflow_decorator(self):
         self.initialize_providers_list()
         self._discover_taskflow_decorators()
 
+    @provider_info_cache("provider_configs")
+    def initialize_provider_configs(self):
+        """Lazy initialization of provider configuration metadata and merge it 
into SDK ``conf``."""
+        self.initialize_providers_list()
+        self._discover_config()
+        # Now update conf with the new provider configuration from providers
+        from airflow.sdk.configuration import conf
+
+        conf.load_providers_configuration()

Review Comment:
   There is a new import inside `initialize_provider_configs()` (`from 
airflow.sdk.configuration import conf`). If this is only for convenience, 
prefer a module-level import; if it's required to avoid circular imports / 
early SDK conf initialization, please add a brief comment explaining that 
rationale.



##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -241,7 +275,85 @@ def _lookup_sequence(self) -> list[Callable]:
             self._get_option_from_commands,
             self._get_option_from_secrets,
             self._get_option_from_defaults,
+            self._get_option_from_provider_cfg_config_fallbacks,
+            self._get_option_from_provider_metadata_config_fallbacks,
+        ]
+
+    def _get_config_sources_for_as_dict(self) -> list[tuple[str, 
ConfigParser]]:
+        """Override the base method to add provider fallbacks when providers 
are loaded."""
+        sources: list[tuple[str, ConfigParser]] = [
+            ("default", self._default_values),
+            ("airflow.cfg", self),
         ]
+        if self._providers_configuration_loaded:
+            sources.insert(
+                0,
+                (
+                    "provider-metadata-fallback-defaults",
+                    self._provider_metadata_config_fallback_default_values,
+                ),
+            )
+            sources.insert(
+                0,
+                ("provider-cfg-fallback-defaults", 
self._provider_cfg_config_fallback_default_values),
+            )
+        return sources
+
+    def _get_option_from_provider_cfg_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider fallback defaults."""
+        value = self.get_from_provider_cfg_config_fallback_defaults(section, 
key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL
+
+    def _get_option_from_provider_metadata_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider metadata fallback defaults."""
+        value = 
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL

Review Comment:
   New provider-default lookup paths were added via 
`_get_option_from_provider_cfg_config_fallbacks` / 
`_get_option_from_provider_metadata_config_fallbacks`, but there doesn’t appear 
to be a unit test that asserts `get()` returns a provider-metadata default 
*before* `load_providers_configuration()` (i.e. via the fallback parser). 
Adding a focused test would help prevent regressions in the lookup order and 
the lazy-loading behavior.



##########
airflow-core/src/airflow/providers_manager.py:
##########
@@ -1491,6 +1478,12 @@ def _cleanup(self):
         self._executor_without_check_set.clear()
         self._queue_class_name_set.clear()
         self._provider_configs.clear()
+
+        # clear provider config cache in conf as well
+        from airflow.configuration import conf
+
+        conf.invalidate_cache()

Review Comment:
   There is a new import inside `_cleanup()` (`from airflow.configuration 
import conf`). If this is not required for circular-import avoidance, it should 
be moved to module scope; otherwise, please document why it must remain a local 
import so future refactors don't accidentally reintroduce cycles.



##########
task-sdk/src/airflow/sdk/configuration.py:
##########
@@ -125,11 +125,21 @@ def __init__(
         *args,
         **kwargs,
     ):

Review Comment:
   `AirflowSDKConfigParser.__init__` now performs a local import of 
`ProvidersManagerTaskRuntime`. If this is to avoid a circular import between 
configuration and providers runtime, please add a short comment explaining 
that; otherwise prefer a module-level import to keep imports predictable and 
avoid repeated work on parser instantiation.
   ```suggestion
       ):
           # Imported locally to avoid a circular import between configuration 
and providers runtime.
   ```



##########
task-sdk/src/airflow/sdk/providers_manager_runtime.py:
##########
@@ -608,6 +635,12 @@ def _cleanup(self):
         self._asset_uri_handlers.clear()
         self._asset_factories.clear()
         self._asset_to_openlineage_converters.clear()
+        self._provider_configs.clear()
+
+        # clear provider config cache in conf as well
+        from airflow.sdk.configuration import conf
+
+        conf.invalidate_cache()

Review Comment:
   There is a new import inside `_cleanup()` (`from airflow.sdk.configuration 
import conf`). If this is needed for circular-import avoidance / to prevent SDK 
conf from being imported at module import time, please add a short comment 
clarifying; otherwise consider moving it to module scope for consistency and to 
avoid hidden import costs during cleanup.



##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -241,7 +275,85 @@ def _lookup_sequence(self) -> list[Callable]:
             self._get_option_from_commands,
             self._get_option_from_secrets,
             self._get_option_from_defaults,
+            self._get_option_from_provider_cfg_config_fallbacks,
+            self._get_option_from_provider_metadata_config_fallbacks,
+        ]
+
+    def _get_config_sources_for_as_dict(self) -> list[tuple[str, 
ConfigParser]]:
+        """Override the base method to add provider fallbacks when providers 
are loaded."""
+        sources: list[tuple[str, ConfigParser]] = [
+            ("default", self._default_values),
+            ("airflow.cfg", self),
         ]
+        if self._providers_configuration_loaded:
+            sources.insert(
+                0,
+                (
+                    "provider-metadata-fallback-defaults",
+                    self._provider_metadata_config_fallback_default_values,
+                ),
+            )
+            sources.insert(
+                0,
+                ("provider-cfg-fallback-defaults", 
self._provider_cfg_config_fallback_default_values),
+            )
+        return sources
+
+    def _get_option_from_provider_cfg_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider fallback defaults."""
+        value = self.get_from_provider_cfg_config_fallback_defaults(section, 
key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL
+
+    def _get_option_from_provider_metadata_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider metadata fallback defaults."""
+        value = 
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL
+
+    def get_from_provider_cfg_config_fallback_defaults(self, section: str, 
key: str, **kwargs) -> Any:
+        """Get provider config fallback default values."""
+        raw = kwargs.get("raw", False)
+        vars_ = kwargs.get("vars")
+        return self._provider_cfg_config_fallback_default_values.get(
+            section, key, fallback=None, raw=raw, vars=vars_
+        )
+
+    @functools.cached_property
+    def _provider_metadata_config_fallback_default_values(self) -> 
ConfigParser:
+        """Return Provider metadata config fallback default values."""
+        base_configuration_description: dict[str, dict[str, Any]] = {}
+        for _, config in self._provider_manager_type().provider_configs:

Review Comment:
   `_provider_metadata_config_fallback_default_values` builds 
`base_configuration_description` via `dict.update()`, which will silently 
overwrite earlier provider sections if two providers contribute the same 
section key. That would mask conflicts that `load_providers_configuration()` 
explicitly treats as an error. Consider detecting duplicate sections here too 
(and raising `AirflowConfigException`) to avoid non-deterministic defaults 
depending on provider iteration order.
   ```suggestion
           for _, config in self._provider_manager_type().provider_configs:
               duplicate_sections = base_configuration_description.keys() & 
config.keys()
               if duplicate_sections:
                   raise AirflowConfigException(
                       "Duplicate configuration sections found in provider 
metadata fallback defaults: "
                       f"{', '.join(sorted(duplicate_sections))}"
                   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to