Copilot commented on code in PR #62696:
URL: https://github.com/apache/airflow/pull/62696#discussion_r2973215166
##########
task-sdk/src/airflow/sdk/providers_manager_runtime.py:
##########
@@ -218,6 +220,22 @@ def initialize_providers_taskflow_decorator(self):
self.initialize_providers_list()
self._discover_taskflow_decorators()
+ @provider_info_cache("provider_configs")
+ def initialize_provider_configs(self):
+ """Lazy initialization of provider configuration metadata and merge it
into SDK ``conf``."""
+ self.initialize_providers_list()
+ self._discover_config()
+ # Now update conf with the new provider configuration from providers
+ from airflow.sdk.configuration import conf
+
+ conf.load_providers_configuration()
Review Comment:
There is a new import inside `initialize_provider_configs()` (`from
airflow.sdk.configuration import conf`). If this is only for convenience,
prefer a module-level import; if it's required to avoid circular imports /
early SDK conf initialization, please add a brief comment explaining that
rationale.
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -241,7 +275,85 @@ def _lookup_sequence(self) -> list[Callable]:
self._get_option_from_commands,
self._get_option_from_secrets,
self._get_option_from_defaults,
+ self._get_option_from_provider_cfg_config_fallbacks,
+ self._get_option_from_provider_metadata_config_fallbacks,
+ ]
+
+ def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
+ """Override the base method to add provider fallbacks when providers
are loaded."""
+ sources: list[tuple[str, ConfigParser]] = [
+ ("default", self._default_values),
+ ("airflow.cfg", self),
]
+ if self._providers_configuration_loaded:
+ sources.insert(
+ 0,
+ (
+ "provider-metadata-fallback-defaults",
+ self._provider_metadata_config_fallback_default_values,
+ ),
+ )
+ sources.insert(
+ 0,
+ ("provider-cfg-fallback-defaults",
self._provider_cfg_config_fallback_default_values),
+ )
+ return sources
+
+ def _get_option_from_provider_cfg_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider fallback defaults."""
+ value = self.get_from_provider_cfg_config_fallback_defaults(section,
key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
+
+ def _get_option_from_provider_metadata_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider metadata fallback defaults."""
+ value =
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
Review Comment:
New provider-default lookup paths were added via
`_get_option_from_provider_cfg_config_fallbacks` /
`_get_option_from_provider_metadata_config_fallbacks`, but there doesn’t appear
to be a unit test that asserts `get()` returns a provider-metadata default
*before* `load_providers_configuration()` (i.e. via the fallback parser).
Adding a focused test would help prevent regressions in the lookup order and
the lazy-loading behavior.
##########
airflow-core/src/airflow/providers_manager.py:
##########
@@ -1491,6 +1478,12 @@ def _cleanup(self):
self._executor_without_check_set.clear()
self._queue_class_name_set.clear()
self._provider_configs.clear()
+
+ # clear provider config cache in conf as well
+ from airflow.configuration import conf
+
+ conf.invalidate_cache()
Review Comment:
There is a new import inside `_cleanup()` (`from airflow.configuration
import conf`). If this is not required for circular-import avoidance, it should
be moved to module scope; otherwise, please document why it must remain a local
import so future refactors don't accidentally reintroduce cycles.
##########
task-sdk/src/airflow/sdk/configuration.py:
##########
@@ -125,11 +125,21 @@ def __init__(
*args,
**kwargs,
):
Review Comment:
`AirflowSDKConfigParser.__init__` now performs a local import of
`ProvidersManagerTaskRuntime`. If this is to avoid a circular import between
configuration and providers runtime, please add a short comment explaining
that; otherwise prefer a module-level import to keep imports predictable and
avoid repeated work on parser instantiation.
```suggestion
):
# Imported locally to avoid a circular import between configuration
and providers runtime.
```
##########
task-sdk/src/airflow/sdk/providers_manager_runtime.py:
##########
@@ -608,6 +635,12 @@ def _cleanup(self):
self._asset_uri_handlers.clear()
self._asset_factories.clear()
self._asset_to_openlineage_converters.clear()
+ self._provider_configs.clear()
+
+ # clear provider config cache in conf as well
+ from airflow.sdk.configuration import conf
+
+ conf.invalidate_cache()
Review Comment:
There is a new import inside `_cleanup()` (`from airflow.sdk.configuration
import conf`). If this is needed for circular-import avoidance / to prevent SDK
conf from being imported at module import time, please add a short comment
clarifying; otherwise consider moving it to module scope for consistency and to
avoid hidden import costs during cleanup.
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -241,7 +275,85 @@ def _lookup_sequence(self) -> list[Callable]:
self._get_option_from_commands,
self._get_option_from_secrets,
self._get_option_from_defaults,
+ self._get_option_from_provider_cfg_config_fallbacks,
+ self._get_option_from_provider_metadata_config_fallbacks,
+ ]
+
+ def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
+ """Override the base method to add provider fallbacks when providers
are loaded."""
+ sources: list[tuple[str, ConfigParser]] = [
+ ("default", self._default_values),
+ ("airflow.cfg", self),
]
+ if self._providers_configuration_loaded:
+ sources.insert(
+ 0,
+ (
+ "provider-metadata-fallback-defaults",
+ self._provider_metadata_config_fallback_default_values,
+ ),
+ )
+ sources.insert(
+ 0,
+ ("provider-cfg-fallback-defaults",
self._provider_cfg_config_fallback_default_values),
+ )
+ return sources
+
+ def _get_option_from_provider_cfg_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider fallback defaults."""
+ value = self.get_from_provider_cfg_config_fallback_defaults(section,
key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
+
+ def _get_option_from_provider_metadata_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider metadata fallback defaults."""
+ value =
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
+
+ def get_from_provider_cfg_config_fallback_defaults(self, section: str,
key: str, **kwargs) -> Any:
+ """Get provider config fallback default values."""
+ raw = kwargs.get("raw", False)
+ vars_ = kwargs.get("vars")
+ return self._provider_cfg_config_fallback_default_values.get(
+ section, key, fallback=None, raw=raw, vars=vars_
+ )
+
+ @functools.cached_property
+ def _provider_metadata_config_fallback_default_values(self) ->
ConfigParser:
+ """Return Provider metadata config fallback default values."""
+ base_configuration_description: dict[str, dict[str, Any]] = {}
+ for _, config in self._provider_manager_type().provider_configs:
Review Comment:
`_provider_metadata_config_fallback_default_values` builds
`base_configuration_description` via `dict.update()`, which will silently
overwrite earlier provider sections if two providers contribute the same
section key. That would mask conflicts that `load_providers_configuration()`
explicitly treats as an error. Consider detecting duplicate sections here too
(and raising `AirflowConfigException`) to avoid non-deterministic defaults
depending on provider iteration order.
```suggestion
for _, config in self._provider_manager_type().provider_configs:
duplicate_sections = base_configuration_description.keys() &
config.keys()
if duplicate_sections:
raise AirflowConfigException(
"Duplicate configuration sections found in provider
metadata fallback defaults: "
f"{', '.join(sorted(duplicate_sections))}"
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]