Hi Jaehyeon,

for exceptions that happen at pipeline construction, it should be possible
to use unittest.assertRaises.

For errors at runtime, it should be possible to detect that the pipeline
has finished in a 'FAILED` state. You can capture the state by running the
pipeline without `with` decorator and capturing pipeline_result =
p.run().wait_until_finish(), and analyzing it, or by trying
out PipelineStateMatcher, from a quick look, we have some tests that use
it:
https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L68
,
https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L80
, and it should be possible to create a matcher for a failed state.


On Tue, Jun 18, 2024 at 12:22 PM Jaehyeon Kim <[email protected]> wrote:

> Hello,
>
> I have a unit testing case and the pipeline raises TypeError. How is it
> possible to test? (I don't find unittest assertRaises method in the beam
> testing util)
>
> Cheers,
> Jaehyeon
>
>     def test_write_to_firehose_with_unsupported_types(self):
>         with TestPipeline(options=self.pipeline_opts) as p:
>             output = (
>                 p
>                 | beam.Create(["one", "two", "three", "four"])
>                 | "WriteToFirehose" >> WriteToFirehose(self.
> delivery_stream_name, True)
>                 | "CollectResponse" >> beam.Map(lambda e: e[
> "FailedPutCount"])
>             )
>

Reply via email to