Correction: you need to use `with` to actually run your pipeline:
with beam.Pipeline(options=pipeline_options) as p:
lines = (p
| 'Get List of Tickers' >> ReadFromText(input_file)
| 'Split fields' >> beam.Map(split_fields)
| 'Map to String' >> beam.Map(add_year)
assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
On Fri, Jul 17, 2020 at 3:02 PM Kyle Weaver <[email protected]> wrote:
> > I had a look at the util_test.py, and i see that in those tests
> pipelines are being created as part of tests., and in these tests what are
> being tested are beam functions - eg beam.Map etc.
>
> assert_that checks the results of an entire pipeline, not individual
> transforms. You should be able to apply assert_that to your example:
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
> | 'Get List of Tickers' >> ReadFromText(input_file)
> | 'Split fields' >> beam.Map(split_fields)
> | 'Map to String' >> beam.Map(add_year)
>
> assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>
>
>
> On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <[email protected]> wrote:
>
>> Hello Robert
>> thanks but i think i am either missing the point or not expressing
>> clearly my goal.
>> I had a look at the util_test.py, and i see that in those tests pipelines
>> are being created as part of tests., and in these tests what are being
>> tested are beam functions - eg beam.Map etc.
>> I am after testing a pipeline as a whole. Taking this example,
>>
>> p = beam.Pipeline(options=pipeline_options)
>> lines = (p
>> | 'Get List of Tickers' >> ReadFromText(input_file)
>> | 'Split fields' >> beam.Map(split_fields)
>> | 'Map to String' >> beam.Map(add_year)
>>
>> what i am trying to do is to test a full pipeline run, like in the test
>> example below
>>
>> from mypackage.email_pipeline import run
>>
>> @patch('testing.email_pipeline.ReadFromText')
>> def test_create_pipelne(self, mock_read_from_text):
>> mock_read_from_text.return_value = ['One',
>> 'Two',
>> 'Three']
>>
>> test_pipeline = TestPipeline(is_integration_test=True)
>> pipeline_verifiers = [
>> PipelineStateMatcher(),
>> ]
>> extra_opts = {
>> 'input_table': 'testtable',
>> 'num_records': 1,
>> 'beam_bq_source': 'source',
>> 'on_success_matcher': all_of(*pipeline_verifiers)
>> }
>> result = run(
>> test_pipeline.get_full_options_as_args(**extra_opts))
>>
>> print(result)
>>
>> Basically, i would expect a PCollection as result of the pipeline, and i
>> would be testing the content of the PCollection
>>
>> Running this results in this messsage
>>
>> IT is skipped because --test-pipeline-options is not specified
>>
>> Would you be able to advise on this?
>>
>> kind regards
>>
>> Marco
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> You can use apache_beam.testing.util.assert_that to write tests of
>>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>>
>>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]>
>>> wrote:
>>> >
>>> > Hi all
>>> > i was wondering if anyone could provide pointers on how to test a
>>> pipeline in python.
>>> > I have the following pipeline
>>> >
>>> > lines = (p
>>> > | 'Get List of Tickers' >> beam.Map(get_tickers)
>>> > | 'Split fields' >> beam.Map(split_fields)
>>> > | 'Map to String' >> beam.Map(add_year)
>>> > )
>>> > result = p.run()
>>> >
>>> > Now i can easily test each individual function for each step
>>> (get_tickers, split_fields, add_year)
>>> >
>>> > but is there a way to test the pipeline 'as a whole' ?#
>>> >
>>> > Could anyone point me to some examples?
>>> >
>>> > kind regards
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>