> I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and in these tests what are being
tested are beam functions - eg beam.Map etc.
assert_that checks the results of an entire pipeline, not individual
transforms. You should be able to apply assert_that to your example:
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'Get List of Tickers' >> ReadFromText(input_file)
| 'Split fields' >> beam.Map(split_fields)
| 'Map to String' >> beam.Map(add_year)
assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <[email protected]> wrote:
> Hello Robert
> thanks but i think i am either missing the point or not expressing
> clearly my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines
> are being created as part of tests., and in these tests what are being
> tested are beam functions - eg beam.Map etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
> | 'Get List of Tickers' >> ReadFromText(input_file)
> | 'Split fields' >> beam.Map(split_fields)
> | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test
> example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
> mock_read_from_text.return_value = ['One',
> 'Two',
> 'Three']
>
> test_pipeline = TestPipeline(is_integration_test=True)
> pipeline_verifiers = [
> PipelineStateMatcher(),
> ]
> extra_opts = {
> 'input_table': 'testtable',
> 'num_records': 1,
> 'beam_bq_source': 'source',
> 'on_success_matcher': all_of(*pipeline_verifiers)
> }
> result = run(
> test_pipeline.get_full_options_as_args(**extra_opts))
>
> print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i
> would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
> Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]>
>> wrote:
>> >
>> > Hi all
>> > i was wondering if anyone could provide pointers on how to test a
>> pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> > | 'Get List of Tickers' >> beam.Map(get_tickers)
>> > | 'Split fields' >> beam.Map(split_fields)
>> > | 'Map to String' >> beam.Map(add_year)
>> > )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step
>> (get_tickers, split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >
>>
>