This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 32aff26a95b [yaml] - add yaml expand_pipeline ut tests (#36087)
32aff26a95b is described below
commit 32aff26a95be138c7c13a305ef42b0c85dc6251b
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Sep 8 16:18:36 2025 -0400
[yaml] - add yaml expand_pipeline ut tests (#36087)
* rebase
* add more tests
* fix lint issue
---
.../apache_beam/yaml/yaml_transform_unit_test.py | 64 ++++++++++++++++++++++
1 file changed, 64 insertions(+)
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
index c967a4664e3..14bd758ebae 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -28,6 +28,7 @@ from apache_beam.yaml.yaml_transform import chain_as_composite
from apache_beam.yaml.yaml_transform import ensure_errors_consumed
from apache_beam.yaml.yaml_transform import ensure_transforms_have_types
from apache_beam.yaml.yaml_transform import expand_composite_transform
+from apache_beam.yaml.yaml_transform import expand_pipeline
from apache_beam.yaml.yaml_transform import extract_name
from apache_beam.yaml.yaml_transform import get_main_output_key
from apache_beam.yaml.yaml_transform import identify_object
@@ -1035,6 +1036,69 @@ class YamlTransformTest(unittest.TestCase):
self.assertEqual(result._spec['type'], "composite") # preprocessed spec
+class ExpandPipelineTest(unittest.TestCase):
+ def test_expand_pipeline_with_pipeline_key_only(self):
+ spec = '''
+ pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements: [1,2,3]
+ - type: LogForTesting
+ '''
+ with new_pipeline() as p:
+ expand_pipeline(p, spec, validate_schema=None)
+
+ def test_expand_pipeline_with_pipeline_and_option_keys(self):
+ spec = '''
+ pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements: [1,2,3]
+ - type: LogForTesting
+ options:
+ streaming: false
+ '''
+ with new_pipeline() as p:
+ expand_pipeline(p, spec, validate_schema=None)
+
+ def test_expand_pipeline_with_extra_top_level_keys(self):
+ spec = '''
+ template:
+ version: "1.0"
+ author: "test_user"
+
+ pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements: [1,2,3]
+ - type: LogForTesting
+
+ other_metadata: "This is an ignored comment."
+ '''
+ with new_pipeline() as p:
+ expand_pipeline(p, spec, validate_schema=None)
+
+ def test_expand_pipeline_with_incorrect_pipelines_key_fails(self):
+ spec = '''
+ pipelines:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements: [1,2,3]
+ - type: LogForTesting
+ '''
+ with new_pipeline() as p:
+ with self.assertRaises(KeyError):
+ expand_pipeline(p, spec, validate_schema=None)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()