Hello Robert
thanks but i think i am either missing the point or not expressing clearly
my goal.
I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and in these tests what are being
tested are beam functions - eg beam.Map etc.
I am after testing a pipeline as a whole. Taking this example,
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'Get List of Tickers' >> ReadFromText(input_file)
| 'Split fields' >> beam.Map(split_fields)
| 'Map to String' >> beam.Map(add_year)
what i am trying to do is to test a full pipeline run, like in the
test example below
from mypackage.email_pipeline import run
@patch('testing.email_pipeline.ReadFromText')
def test_create_pipelne(self, mock_read_from_text):
mock_read_from_text.return_value = ['One',
'Two',
'Three']
test_pipeline = TestPipeline(is_integration_test=True)
pipeline_verifiers = [
PipelineStateMatcher(),
]
extra_opts = {
'input_table': 'testtable',
'num_records': 1,
'beam_bq_source': 'source',
'on_success_matcher': all_of(*pipeline_verifiers)
}
result = run(
test_pipeline.get_full_options_as_args(**extra_opts))
print(result)
Basically, i would expect a PCollection as result of the pipeline, and
i would be testing the content of the PCollection
Running this results in this messsage
IT is skipped because --test-pipeline-options is not specified
Would you be able to advise on this?
kind regards
Marco
On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
wrote:
> You can use apache_beam.testing.util.assert_that to write tests of
> Beam pipelines. This is what Beam uses for its tests, e.g.
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>
> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]> wrote:
> >
> > Hi all
> > i was wondering if anyone could provide pointers on how to test a
> pipeline in python.
> > I have the following pipeline
> >
> > lines = (p
> > | 'Get List of Tickers' >> beam.Map(get_tickers)
> > | 'Split fields' >> beam.Map(split_fields)
> > | 'Map to String' >> beam.Map(add_year)
> > )
> > result = p.run()
> >
> > Now i can easily test each individual function for each step
> (get_tickers, split_fields, add_year)
> >
> > but is there a way to test the pipeline 'as a whole' ?#
> >
> > Could anyone point me to some examples?
> >
> > kind regards
> >
> >
> >
> >
> >
>