mwojtyczka commented on code in PR #63775:
URL: https://github.com/apache/airflow/pull/63775#discussion_r2980145373
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -163,6 +165,19 @@ def my_after_func(retry_state):
def databricks_conn(self) -> Connection:
return self.get_connection(self.databricks_conn_id) # type:
ignore[return-value]
+ async def a_databricks_conn(self) -> Any:
+ """
+ Get the connection object asynchronously.
+
+ :return: Connection object (either airflow.models.Connection or
airflow.sdk.definitions.connection.Connection)
+ """
+ if "databricks_conn" in self.__dict__:
+ return self.__dict__["databricks_conn"]
+
+ conn = await get_async_connection(self.databricks_conn_id)
+ self.__dict__["databricks_conn"] = conn
+ return conn
Review Comment:
The return type is `Any` but it could be typed more precisely. Given the
Airflow 2/3 dual-type situation a `TypeAlias` or `Union` would be cleaner and
help static analysis:
```python
from airflow.models import Connection as Airflow2Connection
from airflow.sdk.definitions.connection import Connection as
Airflow3Connection
AnyConnection = Airflow2Connection | Airflow3Connection
async def a_databricks_conn(self) -> AnyConnection: ...
```
##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -163,6 +165,19 @@ def my_after_func(retry_state):
def databricks_conn(self) -> Connection:
return self.get_connection(self.databricks_conn_id) # type:
ignore[return-value]
+ async def a_databricks_conn(self) -> Any:
+ """
+ Get the connection object asynchronously.
+
+ :return: Connection object (either airflow.models.Connection or
airflow.sdk.definitions.connection.Connection)
+ """
+ if "databricks_conn" in self.__dict__:
+ return self.__dict__["databricks_conn"]
+
+ conn = await get_async_connection(self.databricks_conn_id)
+ self.__dict__["databricks_conn"] = conn
+ return conn
+
def get_conn(self) -> Connection:
Review Comment:
This correctly populates the `@cached_property` cache by writing directly to
the instance `__dict__`. One concern: if two coroutines both pass this check
before either sets the value, `get_async_connection` will be called twice. In
practice triggers are single-task so this is unlikely, but worth a comment
explaining the intent:
```python
# Populate the @cached_property cache so subsequent sync access (e.g.
_endpoint_url)
# does not fall through to the blocking get_connection() path.
self.__dict__["databricks_conn"] = conn
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]