Hi Jaehyeon, for exceptions that happen at pipeline construction, it should be possible to use unittest.assertRaises.
For errors at runtime, it should be possible to detect that the pipeline has finished in a 'FAILED` state. You can capture the state by running the pipeline without `with` decorator and capturing pipeline_result = p.run().wait_until_finish(), and analyzing it, or by trying out PipelineStateMatcher, from a quick look, we have some tests that use it: https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L68 , https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L80 , and it should be possible to create a matcher for a failed state. On Tue, Jun 18, 2024 at 12:22 PM Jaehyeon Kim <[email protected]> wrote: > Hello, > > I have a unit testing case and the pipeline raises TypeError. How is it > possible to test? (I don't find unittest assertRaises method in the beam > testing util) > > Cheers, > Jaehyeon > > def test_write_to_firehose_with_unsupported_types(self): > with TestPipeline(options=self.pipeline_opts) as p: > output = ( > p > | beam.Create(["one", "two", "three", "four"]) > | "WriteToFirehose" >> WriteToFirehose(self. > delivery_stream_name, True) > | "CollectResponse" >> beam.Map(lambda e: e[ > "FailedPutCount"]) > ) >
