Hello Robert
 thanks but i think i am either missing the point or not expressing clearly
my goal.
I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and  in these tests what are being
tested are beam functions - eg beam.Map  etc.
I am after testing a pipeline as a whole. Taking this example,

p = beam.Pipeline(options=pipeline_options)
lines = (p
         | 'Get List of Tickers' >> ReadFromText(input_file)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)

what i am trying to do is to test a full pipeline run, like in the
test example below

from mypackage.email_pipeline import run

@patch('testing.email_pipeline.ReadFromText')
def test_create_pipelne(self, mock_read_from_text):
    mock_read_from_text.return_value = ['One',
                                        'Two',
                                        'Three']

    test_pipeline = TestPipeline(is_integration_test=True)
    pipeline_verifiers = [
        PipelineStateMatcher(),
    ]
    extra_opts = {
        'input_table': 'testtable',
        'num_records': 1,
        'beam_bq_source': 'source',
        'on_success_matcher': all_of(*pipeline_verifiers)
    }
    result = run(
                    test_pipeline.get_full_options_as_args(**extra_opts))

    print(result)

Basically, i would expect a PCollection as result of the pipeline, and
i would be testing the content of the PCollection

Running this results in this messsage

IT is skipped because --test-pipeline-options is not specified

Would you be able to advise on this?

kind regards

 Marco












On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
wrote:

> You can use apache_beam.testing.util.assert_that to write tests of
> Beam pipelines. This is what Beam uses for its tests, e.g.
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>
> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]> wrote:
> >
> > Hi all
> >  i was wondering if anyone could provide pointers on how  to test a
> pipeline in python.
> > I have the following pipeline
> >
> > lines = (p
> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
> >          | 'Split fields'  >> beam.Map(split_fields)
> >          | 'Map to String' >> beam.Map(add_year)
> >          )
> > result = p.run()
> >
> > Now i can easily test each individual function for each step
> (get_tickers, split_fields, add_year)
> >
> > but is there a way to test the pipeline 'as a whole' ?#
> >
> > Could anyone point me to some examples?
> >
> > kind regards
> >
> >
> >
> >
> >
>

Reply via email to