Thanks Kyle / Robert... your answer provide me exact information i was after
kind regards
  Marco

On Fri, Jul 17, 2020 at 11:05 PM Kyle Weaver <[email protected]> wrote:

> Correction: you need to use `with` to actually run your pipeline:
>
> with beam.Pipeline(options=pipeline_options) as p:
>   lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
>   assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>
>
>
> On Fri, Jul 17, 2020 at 3:02 PM Kyle Weaver <[email protected]> wrote:
>
>> > I had a look at the util_test.py, and i see that in those tests
>> pipelines are being created as part of tests., and  in these tests what are
>> being tested are beam functions - eg beam.Map  etc.
>>
>> assert_that checks the results of an entire pipeline, not individual
>> transforms. You should be able to apply assert_that to your example:
>>
>> p = beam.Pipeline(options=pipeline_options)
>> lines = (p
>>          | 'Get List of Tickers' >> ReadFromText(input_file)
>>          | 'Split fields'  >> beam.Map(split_fields)
>>          | 'Map to String' >> beam.Map(add_year)
>>
>> assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>>
>>
>>
>> On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <[email protected]>
>> wrote:
>>
>>> Hello Robert
>>>  thanks but i think i am either missing the point or not expressing
>>> clearly my goal.
>>> I had a look at the util_test.py, and i see that in those tests
>>> pipelines are being created as part of tests., and  in these tests what are
>>> being tested are beam functions - eg beam.Map  etc.
>>> I am after testing a pipeline as a whole. Taking this example,
>>>
>>> p = beam.Pipeline(options=pipeline_options)
>>> lines = (p
>>>          | 'Get List of Tickers' >> ReadFromText(input_file)
>>>          | 'Split fields'  >> beam.Map(split_fields)
>>>          | 'Map to String' >> beam.Map(add_year)
>>>
>>> what i am trying to do is to test a full pipeline run, like in the test 
>>> example below
>>>
>>> from mypackage.email_pipeline import run
>>>
>>> @patch('testing.email_pipeline.ReadFromText')
>>> def test_create_pipelne(self, mock_read_from_text):
>>>     mock_read_from_text.return_value = ['One',
>>>                                         'Two',
>>>                                         'Three']
>>>
>>>     test_pipeline = TestPipeline(is_integration_test=True)
>>>     pipeline_verifiers = [
>>>         PipelineStateMatcher(),
>>>     ]
>>>     extra_opts = {
>>>         'input_table': 'testtable',
>>>         'num_records': 1,
>>>         'beam_bq_source': 'source',
>>>         'on_success_matcher': all_of(*pipeline_verifiers)
>>>     }
>>>     result = run(
>>>                     test_pipeline.get_full_options_as_args(**extra_opts))
>>>
>>>     print(result)
>>>
>>> Basically, i would expect a PCollection as result of the pipeline, and i 
>>> would be testing the content of the PCollection
>>>
>>> Running this results in this messsage
>>>
>>> IT is skipped because --test-pipeline-options is not specified
>>>
>>> Would you be able to advise on this?
>>>
>>> kind regards
>>>
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> You can use apache_beam.testing.util.assert_that to write tests of
>>>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>>>
>>>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <[email protected]>
>>>> wrote:
>>>> >
>>>> > Hi all
>>>> >  i was wondering if anyone could provide pointers on how  to test a
>>>> pipeline in python.
>>>> > I have the following pipeline
>>>> >
>>>> > lines = (p
>>>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>>>> >          | 'Split fields'  >> beam.Map(split_fields)
>>>> >          | 'Map to String' >> beam.Map(add_year)
>>>> >          )
>>>> > result = p.run()
>>>> >
>>>> > Now i can easily test each individual function for each step
>>>> (get_tickers, split_fields, add_year)
>>>> >
>>>> > but is there a way to test the pipeline 'as a whole' ?#
>>>> >
>>>> > Could anyone point me to some examples?
>>>> >
>>>> > kind regards
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>>
>>>

Reply via email to