This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 32aff26a95b [yaml] - add yaml expand_pipeline ut tests (#36087)
32aff26a95b is described below

commit 32aff26a95be138c7c13a305ef42b0c85dc6251b
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Sep 8 16:18:36 2025 -0400

    [yaml] - add yaml expand_pipeline ut tests (#36087)
    
    * rebase
    
    * add more tests
    
    * fix lint issue
---
 .../apache_beam/yaml/yaml_transform_unit_test.py   | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
index c967a4664e3..14bd758ebae 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -28,6 +28,7 @@ from apache_beam.yaml.yaml_transform import chain_as_composite
 from apache_beam.yaml.yaml_transform import ensure_errors_consumed
 from apache_beam.yaml.yaml_transform import ensure_transforms_have_types
 from apache_beam.yaml.yaml_transform import expand_composite_transform
+from apache_beam.yaml.yaml_transform import expand_pipeline
 from apache_beam.yaml.yaml_transform import extract_name
 from apache_beam.yaml.yaml_transform import get_main_output_key
 from apache_beam.yaml.yaml_transform import identify_object
@@ -1035,6 +1036,69 @@ class YamlTransformTest(unittest.TestCase):
     self.assertEqual(result._spec['type'], "composite")  # preprocessed spec
 
 
+class ExpandPipelineTest(unittest.TestCase):
+  def test_expand_pipeline_with_pipeline_key_only(self):
+    spec = '''
+      pipeline:
+        type: chain
+        transforms:
+          - type: Create
+            config:
+              elements: [1,2,3]
+          - type: LogForTesting
+    '''
+    with new_pipeline() as p:
+      expand_pipeline(p, spec, validate_schema=None)
+
+  def test_expand_pipeline_with_pipeline_and_option_keys(self):
+    spec = '''
+      pipeline:
+        type: chain
+        transforms:
+          - type: Create
+            config:
+              elements: [1,2,3]
+          - type: LogForTesting
+      options:
+        streaming: false
+    '''
+    with new_pipeline() as p:
+      expand_pipeline(p, spec, validate_schema=None)
+
+  def test_expand_pipeline_with_extra_top_level_keys(self):
+    spec = '''
+      template:
+        version: "1.0"
+        author: "test_user"
+
+      pipeline:
+        type: chain
+        transforms:
+          - type: Create
+            config:
+              elements: [1,2,3]
+          - type: LogForTesting
+
+      other_metadata: "This is an ignored comment."
+    '''
+    with new_pipeline() as p:
+      expand_pipeline(p, spec, validate_schema=None)
+
+  def test_expand_pipeline_with_incorrect_pipelines_key_fails(self):
+    spec = '''
+      pipelines:
+        type: chain
+        transforms:
+          - type: Create
+            config:
+              elements: [1,2,3]
+          - type: LogForTesting
+    '''
+    with new_pipeline() as p:
+      with self.assertRaises(KeyError):
+        expand_pipeline(p, spec, validate_schema=None)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to