If want a full end-to-end integration test of your pipeline, what you can do is:
1) Write your input data to temporary files. 2) Run your pipeline, which writes its output somewhere (ideally a temp location as well). 3) Open up the outputs and see if it was as expected. This is similar to the test at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py , but a bit heavy weight. Another way to validate your pipeline is to refactor the code so the inputs and outputs are pluggable. For example, you could write def run_my_pipeline(input): [all your pipeline logic goes here] [this could also be wrapped up as a PTransform] return result1, result2 def main(...): with beam.Pipeline(...) as p: input = p | beam.io.ReadFromText(...) result1, result2 = run_my_pipeline(input) result1 | beam.io.WriteToSomewhere(...) result2 | beam.io.WriteToSomewhereElse(...) def test(): with beam.Pipeline(...) as p: input = p | beam.Create(...) result1, result2 = run_my_pipeline(input) assert_that(result1, equal_to([...])) assert_that(result2, any_callable_that_validates_result2, label="Check2") You could also parameterize things on your sinks and sources, e.g. def run_my_pipeline(source, sink1, sink2): with beam.Pipeline(...) as p: input = p | source ... result1 | sink1 result2 | sinkn2 def main(...): run_my_pipeline( beam.io.ReadFromText(...), beam.io.WriteToSomewhere(...), beam.io.WriteToSomewhereElse(...)) def test(): class Check(beam.PTransform): def __init__(checker): self._checker = checker def expand(pcoll): assert_that(pcoll, self._checker) run_my_pipeline( beam.Create([...]), Check1(equal_to([...])), Check2(any_callable_that_validates_result2)) or various permutations thereof. Is that more what you're looking for? On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <[email protected]> wrote: > > Hello Robert > thanks but i think i am either missing the point or not expressing clearly > my goal. > I had a look at the util_test.py, and i see that in those tests pipelines are > being created as part of tests., and in these tests what are being tested > are beam functions - eg beam.Map etc. > I am after testing a pipeline as a whole. Taking this example, > > p = beam.Pipeline(options=pipeline_options) > lines = (p > | 'Get List of Tickers' >> ReadFromText(input_file) > | 'Split fields' >> beam.Map(split_fields) > | 'Map to String' >> beam.Map(add_year) > > what i am trying to do is to test a full pipeline run, like in the test > example below > > from mypackage.email_pipeline import run > > @patch('testing.email_pipeline.ReadFromText') > def test_create_pipelne(self, mock_read_from_text): > mock_read_from_text.return_value = ['One', > 'Two', > 'Three'] > > test_pipeline = TestPipeline(is_integration_test=True) > pipeline_verifiers = [ > PipelineStateMatcher(), > ] > extra_opts = { > 'input_table': 'testtable', > 'num_records': 1, > 'beam_bq_source': 'source', > 'on_success_matcher': all_of(*pipeline_verifiers) > } > result = run( > test_pipeline.get_full_options_as_args(**extra_opts)) > > print(result) > > Basically, i would expect a PCollection as result of the pipeline, and i > would be testing the content of the PCollection > > Running this results in this messsage > > IT is skipped because --test-pipeline-options is not specified > > Would you be able to advise on this? > > kind regards > > Marco > > > > > > > > > > > > > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]> wrote: >> >> You can use apache_beam.testing.util.assert_that to write tests of >> Beam pipelines. This is what Beam uses for its tests, e.g. >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80 >> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]> wrote: >> > >> > Hi all >> > i was wondering if anyone could provide pointers on how to test a >> > pipeline in python. >> > I have the following pipeline >> > >> > lines = (p >> > | 'Get List of Tickers' >> beam.Map(get_tickers) >> > | 'Split fields' >> beam.Map(split_fields) >> > | 'Map to String' >> beam.Map(add_year) >> > ) >> > result = p.run() >> > >> > Now i can easily test each individual function for each step (get_tickers, >> > split_fields, add_year) >> > >> > but is there a way to test the pipeline 'as a whole' ?# >> > >> > Could anyone point me to some examples? >> > >> > kind regards >> > >> > >> > >> > >> >
