Hello Robert
 could you point me to a test sample where a 'mock' sink is used?
do you guys have a testing package , which provide an in memory sink where
for example i can dump the result of
my pipeline (as opposed to writing to a file) ?
Additionally, what is the best way to test writing to BigQuery?
I have seen this file
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
but it appears it writes to real big query?

kind regards
 Marco







On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw <[email protected]>
wrote:

> If want a full end-to-end integration test of your pipeline, what you can
> do is:
>
> 1) Write your input data to temporary files.
> 2) Run your pipeline, which writes its output somewhere (ideally a
> temp location as well).
> 3) Open up the outputs and see if it was as expected.
>
> This is similar to the test at
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
> , but a bit heavy weight.
>
> Another way to validate your pipeline is to refactor the code so the
> inputs and outputs are pluggable. For example, you could write
>
> def run_my_pipeline(input):
>    [all your pipeline logic goes here]
>    [this could also be wrapped up as a PTransform]
>    return result1, result2
>
> def main(...):
>   with beam.Pipeline(...) as p:
>     input = p | beam.io.ReadFromText(...)
>     result1, result2 = run_my_pipeline(input)
>     result1 | beam.io.WriteToSomewhere(...)
>     result2 | beam.io.WriteToSomewhereElse(...)
>
> def test():
>   with beam.Pipeline(...) as p:
>     input = p | beam.Create(...)
>     result1, result2 = run_my_pipeline(input)
>     assert_that(result1, equal_to([...]))
>     assert_that(result2, any_callable_that_validates_result2,
> label="Check2")
>
> You could also parameterize things on your sinks and sources, e.g.
>
> def run_my_pipeline(source, sink1, sink2):
>    with beam.Pipeline(...) as p:
>      input = p | source
>      ...
>      result1 | sink1
>      result2 | sinkn2
>
> def main(...):
>   run_my_pipeline(
>       beam.io.ReadFromText(...),
>       beam.io.WriteToSomewhere(...),
>       beam.io.WriteToSomewhereElse(...))
>
> def test():
>
>   class Check(beam.PTransform):
>     def __init__(checker):
>       self._checker = checker
>     def expand(pcoll):
>       assert_that(pcoll, self._checker)
>
>   run_my_pipeline(
>       beam.Create([...]),
>       Check1(equal_to([...])),
>       Check2(any_callable_that_validates_result2))
>
> or various permutations thereof.
>
> Is that more what you're looking for?
>
>
>
> On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <[email protected]> wrote:
> >
> > Hello Robert
> >  thanks but i think i am either missing the point or not expressing
> clearly my goal.
> > I had a look at the util_test.py, and i see that in those tests
> pipelines are being created as part of tests., and  in these tests what are
> being tested are beam functions - eg beam.Map  etc.
> > I am after testing a pipeline as a whole. Taking this example,
> >
> > p = beam.Pipeline(options=pipeline_options)
> > lines = (p
> >          | 'Get List of Tickers' >> ReadFromText(input_file)
> >          | 'Split fields'  >> beam.Map(split_fields)
> >          | 'Map to String' >> beam.Map(add_year)
> >
> > what i am trying to do is to test a full pipeline run, like in the test
> example below
> >
> > from mypackage.email_pipeline import run
> >
> > @patch('testing.email_pipeline.ReadFromText')
> > def test_create_pipelne(self, mock_read_from_text):
> >     mock_read_from_text.return_value = ['One',
> >                                         'Two',
> >                                         'Three']
> >
> >     test_pipeline = TestPipeline(is_integration_test=True)
> >     pipeline_verifiers = [
> >         PipelineStateMatcher(),
> >     ]
> >     extra_opts = {
> >         'input_table': 'testtable',
> >         'num_records': 1,
> >         'beam_bq_source': 'source',
> >         'on_success_matcher': all_of(*pipeline_verifiers)
> >     }
> >     result = run(
> >                     test_pipeline.get_full_options_as_args(**extra_opts))
> >
> >     print(result)
> >
> > Basically, i would expect a PCollection as result of the pipeline, and i
> would be testing the content of the PCollection
> >
> > Running this results in this messsage
> >
> > IT is skipped because --test-pipeline-options is not specified
> >
> > Would you be able to advise on this?
> >
> > kind regards
> >
> >  Marco
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> You can use apache_beam.testing.util.assert_that to write tests of
> >> Beam pipelines. This is what Beam uses for its tests, e.g.
> >>
> >>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
> >>
> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]>
> wrote:
> >> >
> >> > Hi all
> >> >  i was wondering if anyone could provide pointers on how  to test a
> pipeline in python.
> >> > I have the following pipeline
> >> >
> >> > lines = (p
> >> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
> >> >          | 'Split fields'  >> beam.Map(split_fields)
> >> >          | 'Map to String' >> beam.Map(add_year)
> >> >          )
> >> > result = p.run()
> >> >
> >> > Now i can easily test each individual function for each step
> (get_tickers, split_fields, add_year)
> >> >
> >> > but is there a way to test the pipeline 'as a whole' ?#
> >> >
> >> > Could anyone point me to some examples?
> >> >
> >> > kind regards
> >> >
> >> >
> >> >
> >> >
> >> >
>

Reply via email to