This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bf46f68a7c1 feat(datasets): make strict_dataset_uri_validation default 
to True (#41814)
bf46f68a7c1 is described below

commit bf46f68a7c1dedb6b5abc8dbb410af7e8cd88d55
Author: Wei Lee <weilee...@gmail.com>
AuthorDate: Wed Oct 9 18:52:10 2024 +0800

    feat(datasets): make strict_dataset_uri_validation default to True (#41814)
    
    * feat(datasets): make strict_dataset_uri_validation default to True
    
    * docs(assets): add uri scheme restrict instruction for AIP-60
    
    * feat(assets): add an error logging to URI that is not AIP-60 compliant
    
    * Better test string
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 airflow/assets/__init__.py                         |  20 ++--
 airflow/config_templates/config.yml                |   4 +-
 .../authoring-and-scheduling/assets.rst            | 109 ++++++++++++---------
 newsfragments/41814.significant.rst                |   1 +
 tests/assets/tests_asset.py                        |  11 ---
 tests/io/test_path.py                              |   2 +-
 6 files changed, 77 insertions(+), 70 deletions(-)

diff --git a/airflow/assets/__init__.py b/airflow/assets/__init__.py
index deb9aa593de..e11b9c49df3 100644
--- a/airflow/assets/__init__.py
+++ b/airflow/assets/__init__.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import logging
 import os
 import urllib.parse
 import warnings
@@ -41,6 +42,9 @@ from airflow.configuration import conf
 __all__ = ["Asset", "AssetAll", "AssetAny"]
 
 
+log = logging.getLogger(__name__)
+
+
 def normalize_noop(parts: SplitResult) -> SplitResult:
     """
     Place-hold a :class:`~urllib.parse.SplitResult`` normalizer.
@@ -109,14 +113,16 @@ def _sanitize_uri(uri: str) -> str:
         try:
             parsed = normalizer(parsed)
         except ValueError as exception:
-            if conf.getboolean("core", "strict_asset_uri_validation", 
fallback=False):
+            if conf.getboolean("core", "strict_asset_uri_validation", 
fallback=True):
+                log.error(
+                    (
+                        "The Asset URI %s is not AIP-60 compliant: %s. "
+                        "Please check 
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/assets.html";
+                    ),
+                    uri,
+                    exception,
+                )
                 raise
-            warnings.warn(
-                f"The Asset URI {uri} is not AIP-60 compliant: {exception}. "
-                f"In Airflow 3, this will raise an exception.",
-                UserWarning,
-                stacklevel=3,
-            )
     return urllib.parse.urlunsplit(parsed)
 
 
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index b96c07f2373..0be77a3b682 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -486,9 +486,7 @@ core:
     strict_asset_uri_validation:
       description: |
         Asset URI validation should raise an exception if it is not compliant 
with AIP-60.
-        By default this configuration is false, meaning that Airflow 2.x only 
warns the user.
-        In Airflow 3, this configuration will be enabled by default.
-      default: "False"
+      default: "True"
       example: ~
       version_added: 2.9.2
       type: boolean
diff --git a/docs/apache-airflow/authoring-and-scheduling/assets.rst 
b/docs/apache-airflow/authoring-and-scheduling/assets.rst
index d37143367fa..7940a905167 100644
--- a/docs/apache-airflow/authoring-and-scheduling/assets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/assets.rst
@@ -23,23 +23,23 @@ Data-aware scheduling
 Quickstart
 ----------
 
-In addition to scheduling DAGs based on time, you can also schedule DAGs to 
run based on when a task updates a asset.
+In addition to scheduling DAGs based on time, you can also schedule DAGs to 
run based on when a task updates an asset.
 
 .. code-block:: python
 
-    from airflow.assets import asset
+    from airflow.assets import Asset
 
     with DAG(...):
         MyOperator(
             # this task updates example.csv
-            outlets=[asset("s3://asset-bucket/example.csv")],
+            outlets=[Asset("s3://asset-bucket/example.csv")],
             ...,
         )
 
 
     with DAG(
         # this DAG should be run when example.csv is updated (by dag1)
-        schedule=[asset("s3://asset-bucket/example.csv")],
+        schedule=[Asset("s3://asset-bucket/example.csv")],
         ...,
     ):
         ...
@@ -48,7 +48,7 @@ In addition to scheduling DAGs based on time, you can also 
schedule DAGs to run
 .. image:: /img/asset-scheduled-dags.png
 
 
-What is a "asset"?
+What is an "Asset"?
 --------------------
 
 An Airflow asset is a logical grouping of data. Upstream producer tasks can 
update assets, and asset updates contribute to scheduling downstream consumer 
DAGs.
@@ -57,13 +57,19 @@ An Airflow asset is a logical grouping of data. Upstream 
producer tasks can upda
 
 .. code-block:: python
 
-    from airflow.assets import asset
+    from airflow.assets import Asset
 
-    example_asset = asset("s3://asset-bucket/example.csv")
+    example_asset = Asset("s3://asset-bucket/example.csv")
 
 Airflow makes no assumptions about the content or location of the data 
represented by the URI, and treats the URI like a string. This means that 
Airflow treats any regular expressions, like ``input_\d+.csv``, or file glob 
patterns, such as ``input_2022*.csv``, as an attempt to create multiple assets 
from one declaration, and they will not work.
 
-You must create assets with a valid URI. Airflow core and providers define 
various URI schemes that you can use, such as ``file`` (core), ``postgres`` (by 
the Postgres provider), and ``s3`` (by the Amazon provider). Third-party 
providers and plugins might also provide their own schemes. These pre-defined 
schemes have individual semantics that are expected to be followed.
+You must create assets with a valid URI. Airflow core and providers define 
various URI schemes that you can use, such as ``file`` (core), ``postgres`` (by 
the Postgres provider), and ``s3`` (by the Amazon provider). Third-party 
providers and plugins might also provide their own schemes. These pre-defined 
schemes have individual semantics that are expected to be followed. You can use 
the optional name argument to provide a more human-readable identifier to the 
asset.
+
+.. code-block:: python
+
+    from airflow.assets import Asset
+
+    example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")
 
 What is valid URI?
 ------------------
@@ -72,6 +78,13 @@ Technically, the URI must conform to the valid character set 
in RFC 3986, which
 
 The URI is also case sensitive, so ``s3://example/asset`` and 
``s3://Example/asset`` are considered different. Note that the *host* part of 
the URI is also case sensitive, which differs from RFC 3986.
 
+For pre-defined schemes (e.g., ``file``, ``postgres``, and ``s3``), you must 
provide a meaning URI. If you can't provide one, use another scheme altogether 
that don't have the semantic restrictions. Airflow will never require a 
semantic for user-defined URI schemes  (with a prefix x-), so that can be a 
good alternative. If you have a URI that can only be obtained later (e.g., 
during task execution), consider using ``AssetAlias`` instead and update the 
URI later.
+
+.. code-block:: python
+
+    # invalid asset:
+    must_contain_bucket_name = Asset("s3://")
+
 Do not use the ``airflow`` scheme, which is is reserved for Airflow's 
internals.
 
 Airflow always prefers using lower cases in schemes, and case sensitivity is 
needed in the host part of the URI to correctly distinguish between resources.
@@ -79,65 +92,65 @@ Airflow always prefers using lower cases in schemes, and 
case sensitivity is nee
 .. code-block:: python
 
     # invalid assets:
-    reserved = asset("airflow://example_asset")
-    not_ascii = asset("èxample_datašet")
+    reserved = Asset("airflow://example_asset")
+    not_ascii = Asset("èxample_datašet")
 
 If you want to define assets with a scheme that doesn't include additional 
semantic constraints, use a scheme with the prefix ``x-``. Airflow skips any 
semantic validation on URIs with these schemes.
 
 .. code-block:: python
 
     # valid asset, treated as a plain string
-    my_ds = asset("x-my-thing://foobarbaz")
+    my_ds = Asset("x-my-thing://foobarbaz")
 
 The identifier does not have to be absolute; it can be a scheme-less, relative 
URI, or even just a simple path or string:
 
 .. code-block:: python
 
     # valid assets:
-    schemeless = asset("//example/asset")
-    csv_file = asset("example_asset")
+    schemeless = Asset("//example/asset")
+    csv_file = Asset("example_asset")
 
 Non-absolute identifiers are considered plain strings that do not carry any 
semantic meanings to Airflow.
 
 Extra information on asset
 ----------------------------
 
-If needed, you can include an extra dictionary in a asset:
+If needed, you can include an extra dictionary in an asset:
 
 .. code-block:: python
 
-    example_asset = asset(
+    example_asset = Asset(
         "s3://asset/example.csv",
         extra={"team": "trainees"},
     )
 
-This can be used to supply custom description to the asset, such as who has 
ownership to the target file, or what the file is for. The extra information 
does not affect a asset's identity. This means a DAG will be triggered by a 
asset with an identical URI, even if the extra dict is different:
+This can be used to supply custom description to the asset, such as who has 
ownership to the target file, or what the file is for. The extra information 
does not affect an asset's identity. This means a DAG will be triggered by an 
asset with an identical URI, even if the extra dict is different:
 
 .. code-block:: python
 
     with DAG(
         dag_id="consumer",
-        schedule=[asset("s3://asset/example.csv", extra={"different": 
"extras"})],
+        schedule=[Asset("s3://asset/example.csv", extra={"different": 
"extras"})],
     ):
         ...
 
     with DAG(dag_id="producer", ...):
         MyOperator(
             # triggers "consumer" with the given extra!
-            outlets=[asset("s3://asset/example.csv", extra={"team": 
"trainees"})],
+            outlets=[Asset("s3://asset/example.csv", extra={"team": 
"trainees"})],
             ...,
         )
 
-.. note:: **Security Note:** asset URI and extra fields are not encrypted, 
they are stored in cleartext in Airflow's metadata database. Do NOT store any 
sensitive values, especially credentials, in either asset URIs or extra key 
values!
+.. note:: **Security Note:** Asset URI and extra fields are not encrypted, 
they are stored in cleartext in Airflow's metadata database. Do NOT store any 
sensitive values, especially credentials, in either asset URIs or extra key 
values!
 
 How to use assets in your DAGs
 --------------------------------
 
-You can use assets to specify data dependencies in your DAGs. The following 
example shows how after the ``producer`` task in the ``producer`` DAG 
successfully completes, Airflow schedules the ``consumer`` DAG. Airflow marks a 
asset as ``updated`` only if the task completes successfully. If the task fails 
or if it is skipped, no update occurs, and Airflow doesn't schedule the 
``consumer`` DAG.
+You can use assets to specify data dependencies in your DAGs. The following 
example shows how after the ``producer`` task in the ``producer`` DAG 
successfully completes, Airflow schedules the ``consumer`` DAG. Airflow marks 
an asset as ``updated`` only if the task completes successfully. If the task 
fails or if it is skipped, no update occurs, and Airflow doesn't schedule the 
``consumer`` DAG.
 
 .. code-block:: python
 
-    example_asset = asset("s3://asset/example.csv")
+    example_asset = Asset("s3://asset/example.csv")
 
     with DAG(dag_id="producer", ...):
         BashOperator(task_id="producer", outlets=[example_asset], ...)
@@ -147,7 +160,7 @@ You can use assets to specify data dependencies in your 
DAGs. The following exam
 
 
 You can find a listing of the relationships between assets and DAGs in the
-:ref:`assets View<ui:assets-view>`
+:ref:`Assets View<ui:assets-view>`
 
 Multiple assets
 -----------------
@@ -228,17 +241,17 @@ Attaching extra information to an emitting asset event
 
 .. versionadded:: 2.10.0
 
-A task with a asset outlet can optionally attach extra information before it 
emits a asset event. This is different
-from `Extra information on asset`_. Extra information on a asset statically 
describes the entity pointed to by the asset URI; extra information on the 
*asset event* instead should be used to annotate the triggering data change, 
such as how many rows in the database are changed by the update, or the date 
range covered by it.
+A task with an asset outlet can optionally attach extra information before it 
emits an asset event. This is different
+from `Extra information on asset`_. Extra information on an asset statically 
describes the entity pointed to by the asset URI; extra information on the 
*asset event* instead should be used to annotate the triggering data change, 
such as how many rows in the database are changed by the update, or the date 
range covered by it.
 
 The easiest way to attach extra information to the asset event is by 
``yield``-ing a ``Metadata`` object from a task:
 
 .. code-block:: python
 
-    from airflow.assets import asset
+    from airflow.assets import Asset
     from airflow.assets.metadata import Metadata
 
-    example_s3_asset = asset("s3://asset/example.csv")
+    example_s3_asset = Asset("s3://asset/example.csv")
 
 
     @task(outlets=[example_s3_asset])
@@ -268,7 +281,7 @@ Fetching information from previously emitted asset events
 
 .. versionadded:: 2.10.0
 
-Events of a asset defined in a task's ``outlets``, as described in the 
previous section, can be read by a task that declares the same asset in its 
``inlets``. A asset event entry contains ``extra`` (see previous section for 
details), ``timestamp`` indicating when the event was emitted from a task, and 
``source_task_instance`` linking the event back to its source.
+Events of an asset defined in a task's ``outlets``, as described in the 
previous section, can be read by a task that declares the same asset in its 
``inlets``. A asset event entry contains ``extra`` (see previous section for 
details), ``timestamp`` indicating when the event was emitted from a task, and 
``source_task_instance`` linking the event back to its source.
 
 Inlet asset events can be read with the ``inlet_events`` accessor in the 
execution context. Continuing from the ``write_to_s3`` task in the previous 
section:
 
@@ -291,7 +304,7 @@ Example:
 
 .. code-block:: python
 
-    example_snowflake_asset = asset("snowflake://my_db/my_schema/my_table")
+    example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
 
     with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
         SQLExecuteQueryOperator(
@@ -332,7 +345,7 @@ In this example, the DAG ``waiting_for_asset_1_and_2`` will 
be triggered when ta
 
     with DAG(
         dag_id="waiting_for_asset_1_and_2",
-        schedule=[asset("asset-1"), asset("asset-2")],
+        schedule=[Asset("asset-1"), Asset("asset-2")],
         ...,
     ):
         ...
@@ -344,8 +357,8 @@ In this example, the DAG ``waiting_for_asset_1_and_2`` will 
be triggered when ta
 * Get queued asset events for a DAG: ``/dags/{dag_id}/assets/queuedEvent``
 * Delete a queued asset event for a DAG: ``/assets/queuedEvent/{uri}``
 * Delete queued asset events for a DAG: ``/dags/{dag_id}/assets/queuedEvent``
-* Get queued asset events for a asset: 
``/dags/{dag_id}/assets/queuedEvent/{uri}``
-* Delete queued asset events for a asset: ``DELETE 
/dags/{dag_id}/assets/queuedEvent/{uri}``
+* Get queued asset events for an asset: 
``/dags/{dag_id}/assets/queuedEvent/{uri}``
+* Delete queued asset events for an asset: ``DELETE 
/dags/{dag_id}/assets/queuedEvent/{uri}``
 
  For how to use REST API and the parameters needed for these endpoints, please 
refer to :doc:`Airflow API </stable-rest-api-ref>`.
 
@@ -373,8 +386,8 @@ To schedule a DAG to run only when two specific assets have 
both been updated, u
 
 .. code-block:: python
 
-    dag1_asset = asset("s3://dag1/output_1.txt")
-    dag2_asset = asset("s3://dag2/output_1.txt")
+    dag1_asset = Asset("s3://dag1/output_1.txt")
+    dag2_asset = Asset("s3://dag2/output_1.txt")
 
     with DAG(
         # Consume asset 1 and 2 with asset expressions
@@ -402,7 +415,7 @@ For scenarios requiring more intricate conditions, such as 
triggering a DAG when
 
 .. code-block:: python
 
-    dag3_asset = asset("s3://dag3/output_3.txt")
+    dag3_asset = Asset("s3://dag3/output_3.txt")
 
     with DAG(
         # Consume asset 1 or both 2 and 3 with asset expressions
@@ -421,9 +434,9 @@ How to use AssetAlias
 
 ``AssetAlias`` has one single argument ``name`` that uniquely identifies the 
asset. The task must first declare the alias as an outlet, and use 
``outlet_events`` or yield ``Metadata`` to add events to it.
 
-The following example creates a asset event against the S3 URI 
``f"s3://bucket/my-task"``  with optional extra information ``extra``. If the 
asset does not exist, Airflow will dynamically create it and log a warning 
message.
+The following example creates an asset event against the S3 URI 
``f"s3://bucket/my-task"``  with optional extra information ``extra``. If the 
asset does not exist, Airflow will dynamically create it and log a warning 
message.
 
-**Emit a asset event during task execution through outlet_events**
+**Emit an asset event during task execution through outlet_events**
 
 .. code-block:: python
 
@@ -432,10 +445,10 @@ The following example creates a asset event against the 
S3 URI ``f"s3://bucket/m
 
     @task(outlets=[AssetAlias("my-task-outputs")])
     def my_task_with_outlet_events(*, outlet_events):
-        outlet_events["my-task-outputs"].add(asset("s3://bucket/my-task"), 
extra={"k": "v"})
+        outlet_events["my-task-outputs"].add(Asset("s3://bucket/my-task"), 
extra={"k": "v"})
 
 
-**Emit a asset event during task execution through yielding Metadata**
+**Emit an asset event during task execution through yielding Metadata**
 
 .. code-block:: python
 
@@ -444,7 +457,7 @@ The following example creates a asset event against the S3 
URI ``f"s3://bucket/m
 
     @task(outlets=[AssetAlias("my-task-outputs")])
     def my_task_with_metadata():
-        s3_asset = asset("s3://bucket/my-task")
+        s3_asset = Asset("s3://bucket/my-task")
         yield Metadata(s3_asset, extra={"k": "v"}, alias="my-task-outputs")
 
 Only one asset event is emitted for an added asset, even if it is added to the 
alias multiple times, or added to multiple aliases. However, if different 
``extra`` values are passed, it can emit multiple asset events. In the 
following example, two asset events will be emitted.
@@ -462,15 +475,15 @@ Only one asset event is emitted for an added asset, even 
if it is added to the a
         ]
     )
     def my_task_with_outlet_events(*, outlet_events):
-        outlet_events["my-task-outputs-1"].add(asset("s3://bucket/my-task"), 
extra={"k": "v"})
+        outlet_events["my-task-outputs-1"].add(Asset("s3://bucket/my-task"), 
extra={"k": "v"})
         # This line won't emit an additional asset event as the asset and 
extra are the same as the previous line.
-        outlet_events["my-task-outputs-2"].add(asset("s3://bucket/my-task"), 
extra={"k": "v"})
+        outlet_events["my-task-outputs-2"].add(Asset("s3://bucket/my-task"), 
extra={"k": "v"})
         # This line will emit an additional asset event as the extra is 
different.
-        outlet_events["my-task-outputs-3"].add(asset("s3://bucket/my-task"), 
extra={"k2": "v2"})
+        outlet_events["my-task-outputs-3"].add(Asset("s3://bucket/my-task"), 
extra={"k2": "v2"})
 
 Scheduling based on asset aliases
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Since asset events added to an alias are just simple asset events, a 
downstream DAG depending on the actual asset can read asset events of it 
normally, without considering the associated aliases. A downstream DAG can also 
depend on an asset alias. The authoring syntax is referencing the 
``AssetAlias`` by name, and the associated asset events are picked up for 
scheduling. Note that a DAG can be triggered by a task with 
``outlets=AssetAlias("xxx")`` if and only if the alias is resolved int [...]
+Since asset events added to an alias are just simple asset events, a 
downstream DAG depending on the actual asset can read asset events of it 
normally, without considering the associated aliases. A downstream DAG can also 
depend on an asset alias. The authoring syntax is referencing the 
``AssetAlias`` by name, and the associated asset events are picked up for 
scheduling. Note that a DAG can be triggered by a task with 
``outlets=AssetAlias("xxx")`` if and only if the alias is resolved int [...]
 
 The asset alias is resolved to the assets during DAG parsing. Thus, if the 
"min_file_process_interval" configuration is set to a high value, there is a 
possibility that the asset alias may not be resolved. To resolve this issue, 
you can trigger DAG parsing.
 
@@ -478,7 +491,7 @@ The asset alias is resolved to the assets during DAG 
parsing. Thus, if the "min_
 
     with DAG(dag_id="asset-producer"):
 
-        @task(outlets=[asset("example-alias")])
+        @task(outlets=[Asset("example-alias")])
         def produce_asset_events():
             pass
 
@@ -487,17 +500,17 @@ The asset alias is resolved to the assets during DAG 
parsing. Thus, if the "min_
 
         @task(outlets=[AssetAlias("example-alias")])
         def produce_asset_events(*, outlet_events):
-            outlet_events["example-alias"].add(asset("s3://bucket/my-task"))
+            outlet_events["example-alias"].add(Asset("s3://bucket/my-task"))
 
 
-    with DAG(dag_id="asset-consumer", schedule=asset("s3://bucket/my-task")):
+    with DAG(dag_id="asset-consumer", schedule=Asset("s3://bucket/my-task")):
         ...
 
     with DAG(dag_id="asset-alias-consumer", 
schedule=AssetAlias("example-alias")):
         ...
 
 
-In the example provided, once the DAG ``asset-alias-producer`` is executed, 
the asset alias ``AssetAlias("example-alias")`` will be resolved to 
``asset("s3://bucket/my-task")``. However, the DAG ``asset-alias-consumer`` 
will have to wait for the next DAG re-parsing to update its schedule. To 
address this, Airflow will re-parse the DAGs relying on the asset alias 
``AssetAlias("example-alias")`` when it's resolved into assets that these DAGs 
did not previously depend on. As a result, both  [...]
+In the example provided, once the DAG ``asset-alias-producer`` is executed, 
the asset alias ``AssetAlias("example-alias")`` will be resolved to 
``Asset("s3://bucket/my-task")``. However, the DAG ``asset-alias-consumer`` 
will have to wait for the next DAG re-parsing to update its schedule. To 
address this, Airflow will re-parse the DAGs relying on the asset alias 
``AssetAlias("example-alias")`` when it's resolved into assets that these DAGs 
did not previously depend on. As a result, both  [...]
 
 
 Fetching information from previously emitted asset events through resolved 
asset aliases
@@ -511,7 +524,7 @@ As mentioned in :ref:`Fetching information from previously 
emitted asset events<
 
         @task(outlets=[AssetAlias("example-alias")])
         def produce_asset_events(*, outlet_events):
-            outlet_events["example-alias"].add(asset("s3://bucket/my-task"), 
extra={"row_count": 1})
+            outlet_events["example-alias"].add(Asset("s3://bucket/my-task"), 
extra={"row_count": 1})
 
 
     with DAG(dag_id="asset-alias-consumer", schedule=None):
diff --git a/newsfragments/41814.significant.rst 
b/newsfragments/41814.significant.rst
new file mode 100644
index 00000000000..f3b6003a5c3
--- /dev/null
+++ b/newsfragments/41814.significant.rst
@@ -0,0 +1 @@
+Change the default value of ``strict_dataset_uri_validation`` to True.
diff --git a/tests/assets/tests_asset.py b/tests/assets/tests_asset.py
index da6ef8ee79e..afbb46827f3 100644
--- a/tests/assets/tests_asset.py
+++ b/tests/assets/tests_asset.py
@@ -39,7 +39,6 @@ from airflow.models.asset import AssetAliasModel, 
AssetDagRunQueue, AssetModel
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.empty import EmptyOperator
 from airflow.serialization.serialized_objects import BaseSerialization, 
SerializedDAG
-from tests.test_utils.config import conf_vars
 
 
 @pytest.fixture
@@ -492,16 +491,6 @@ def _mock_get_uri_normalizer_noop(normalized_scheme):
 
 
 @patch("airflow.assets._get_uri_normalizer", 
_mock_get_uri_normalizer_raising_error)
-@patch("airflow.assets.warnings.warn")
-def test_sanitize_uri_raises_warning(mock_warn):
-    _sanitize_uri("postgres://localhost:5432/database.schema.table")
-    msg = mock_warn.call_args.args[0]
-    assert "The Asset URI postgres://localhost:5432/database.schema.table is 
not AIP-60 compliant" in msg
-    assert "In Airflow 3, this will raise an exception." in msg
-
-
-@patch("airflow.assets._get_uri_normalizer", 
_mock_get_uri_normalizer_raising_error)
-@conf_vars({("core", "strict_asset_uri_validation"): "True"})
 def test_sanitize_uri_raises_exception():
     with pytest.raises(ValueError) as e_info:
         _sanitize_uri("postgres://localhost:5432/database.schema.table")
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 0e504b586b3..9d944c5f51c 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -406,7 +406,7 @@ class TestFs:
         attach("s3", fs=FakeRemoteFileSystem())
 
         p = "s3"
-        f = "/tmp/foo"
+        f = "bucket/object"
         i = Asset(uri=f"{p}://{f}", extra={"foo": "bar"})
         o = ObjectStoragePath(i)
         assert o.protocol == p

Reply via email to