If want a full end-to-end integration test of your pipeline, what you can do is:

1) Write your input data to temporary files.
2) Run your pipeline, which writes its output somewhere (ideally a
temp location as well).
3) Open up the outputs and see if it was as expected.

This is similar to the test at
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
, but a bit heavy weight.

Another way to validate your pipeline is to refactor the code so the
inputs and outputs are pluggable. For example, you could write

def run_my_pipeline(input):
   [all your pipeline logic goes here]
   [this could also be wrapped up as a PTransform]
   return result1, result2

def main(...):
  with beam.Pipeline(...) as p:
    input = p | beam.io.ReadFromText(...)
    result1, result2 = run_my_pipeline(input)
    result1 | beam.io.WriteToSomewhere(...)
    result2 | beam.io.WriteToSomewhereElse(...)

def test():
  with beam.Pipeline(...) as p:
    input = p | beam.Create(...)
    result1, result2 = run_my_pipeline(input)
    assert_that(result1, equal_to([...]))
    assert_that(result2, any_callable_that_validates_result2, label="Check2")

You could also parameterize things on your sinks and sources, e.g.

def run_my_pipeline(source, sink1, sink2):
   with beam.Pipeline(...) as p:
     input = p | source
     ...
     result1 | sink1
     result2 | sinkn2

def main(...):
  run_my_pipeline(
      beam.io.ReadFromText(...),
      beam.io.WriteToSomewhere(...),
      beam.io.WriteToSomewhereElse(...))

def test():

  class Check(beam.PTransform):
    def __init__(checker):
      self._checker = checker
    def expand(pcoll):
      assert_that(pcoll, self._checker)

  run_my_pipeline(
      beam.Create([...]),
      Check1(equal_to([...])),
      Check2(any_callable_that_validates_result2))

or various permutations thereof.

Is that more what you're looking for?



On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <[email protected]> wrote:
>
> Hello Robert
>  thanks but i think i am either missing the point or not expressing clearly 
> my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines are 
> being created as part of tests., and  in these tests what are being tested 
> are beam functions - eg beam.Map  etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test 
> example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
>     mock_read_from_text.return_value = ['One',
>                                         'Two',
>                                         'Three']
>
>     test_pipeline = TestPipeline(is_integration_test=True)
>     pipeline_verifiers = [
>         PipelineStateMatcher(),
>     ]
>     extra_opts = {
>         'input_table': 'testtable',
>         'num_records': 1,
>         'beam_bq_source': 'source',
>         'on_success_matcher': all_of(*pipeline_verifiers)
>     }
>     result = run(
>                     test_pipeline.get_full_options_as_args(**extra_opts))
>
>     print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i 
> would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
>  Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]> wrote:
>>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]> wrote:
>> >
>> > Hi all
>> >  i was wondering if anyone could provide pointers on how  to test a 
>> > pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >          | 'Split fields'  >> beam.Map(split_fields)
>> >          | 'Map to String' >> beam.Map(add_year)
>> >          )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step (get_tickers, 
>> > split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >

Reply via email to