Hello Robert could you point me to a test sample where a 'mock' sink is used? do you guys have a testing package , which provide an in memory sink where for example i can dump the result of my pipeline (as opposed to writing to a file) ? Additionally, what is the best way to test writing to BigQuery? I have seen this file https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py but it appears it writes to real big query?
kind regards Marco On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw <[email protected]> wrote: > If want a full end-to-end integration test of your pipeline, what you can > do is: > > 1) Write your input data to temporary files. > 2) Run your pipeline, which writes its output somewhere (ideally a > temp location as well). > 3) Open up the outputs and see if it was as expected. > > This is similar to the test at > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py > , but a bit heavy weight. > > Another way to validate your pipeline is to refactor the code so the > inputs and outputs are pluggable. For example, you could write > > def run_my_pipeline(input): > [all your pipeline logic goes here] > [this could also be wrapped up as a PTransform] > return result1, result2 > > def main(...): > with beam.Pipeline(...) as p: > input = p | beam.io.ReadFromText(...) > result1, result2 = run_my_pipeline(input) > result1 | beam.io.WriteToSomewhere(...) > result2 | beam.io.WriteToSomewhereElse(...) > > def test(): > with beam.Pipeline(...) as p: > input = p | beam.Create(...) > result1, result2 = run_my_pipeline(input) > assert_that(result1, equal_to([...])) > assert_that(result2, any_callable_that_validates_result2, > label="Check2") > > You could also parameterize things on your sinks and sources, e.g. > > def run_my_pipeline(source, sink1, sink2): > with beam.Pipeline(...) as p: > input = p | source > ... > result1 | sink1 > result2 | sinkn2 > > def main(...): > run_my_pipeline( > beam.io.ReadFromText(...), > beam.io.WriteToSomewhere(...), > beam.io.WriteToSomewhereElse(...)) > > def test(): > > class Check(beam.PTransform): > def __init__(checker): > self._checker = checker > def expand(pcoll): > assert_that(pcoll, self._checker) > > run_my_pipeline( > beam.Create([...]), > Check1(equal_to([...])), > Check2(any_callable_that_validates_result2)) > > or various permutations thereof. > > Is that more what you're looking for? > > > > On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <[email protected]> wrote: > > > > Hello Robert > > thanks but i think i am either missing the point or not expressing > clearly my goal. > > I had a look at the util_test.py, and i see that in those tests > pipelines are being created as part of tests., and in these tests what are > being tested are beam functions - eg beam.Map etc. > > I am after testing a pipeline as a whole. Taking this example, > > > > p = beam.Pipeline(options=pipeline_options) > > lines = (p > > | 'Get List of Tickers' >> ReadFromText(input_file) > > | 'Split fields' >> beam.Map(split_fields) > > | 'Map to String' >> beam.Map(add_year) > > > > what i am trying to do is to test a full pipeline run, like in the test > example below > > > > from mypackage.email_pipeline import run > > > > @patch('testing.email_pipeline.ReadFromText') > > def test_create_pipelne(self, mock_read_from_text): > > mock_read_from_text.return_value = ['One', > > 'Two', > > 'Three'] > > > > test_pipeline = TestPipeline(is_integration_test=True) > > pipeline_verifiers = [ > > PipelineStateMatcher(), > > ] > > extra_opts = { > > 'input_table': 'testtable', > > 'num_records': 1, > > 'beam_bq_source': 'source', > > 'on_success_matcher': all_of(*pipeline_verifiers) > > } > > result = run( > > test_pipeline.get_full_options_as_args(**extra_opts)) > > > > print(result) > > > > Basically, i would expect a PCollection as result of the pipeline, and i > would be testing the content of the PCollection > > > > Running this results in this messsage > > > > IT is skipped because --test-pipeline-options is not specified > > > > Would you be able to advise on this? > > > > kind regards > > > > Marco > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> You can use apache_beam.testing.util.assert_that to write tests of > >> Beam pipelines. This is what Beam uses for its tests, e.g. > >> > >> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80 > >> > >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]> > wrote: > >> > > >> > Hi all > >> > i was wondering if anyone could provide pointers on how to test a > pipeline in python. > >> > I have the following pipeline > >> > > >> > lines = (p > >> > | 'Get List of Tickers' >> beam.Map(get_tickers) > >> > | 'Split fields' >> beam.Map(split_fields) > >> > | 'Map to String' >> beam.Map(add_year) > >> > ) > >> > result = p.run() > >> > > >> > Now i can easily test each individual function for each step > (get_tickers, split_fields, add_year) > >> > > >> > but is there a way to test the pipeline 'as a whole' ?# > >> > > >> > Could anyone point me to some examples? > >> > > >> > kind regards > >> > > >> > > >> > > >> > > >> > >
