Ooos, that was dumb. I'm starting to see value in using typed languages. Thanks a lot, Robert!
On Fri, Jul 21, 2017 at 5:09 PM, Robert Bradshaw <[email protected]> wrote: > The line > > return pcoll.pipeline | beam.ParDo(dofn) > > should read > > return pcoll| beam.ParDo(dofn) > > > On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <[email protected]> > wrote: > >> Yep, that's what I ended up doing. >> >> Here's the full code: >> >> class ErrorSieve(beam.PTransform): >> class ErrorSieveDoFn(beam.DoFn): >> def __init__(self, dofn, save_exceptions=False, >> save_stacktraces=False): >> self.dofn = dofn >> self.save_exceptions = save_exceptions >> self.save_stacktraces = save_stacktraces >> >> def process(self, *args, **kwargs): >> try: >> yield inner_process(*args, **kwargs) >> except Exception as e: >> result = [args, kwargs] >> if self.save_exceptions: >> result = result + [e] >> if self.save_stacktraces: >> result = result + [traceback.extract_stack()] >> yield beam.pvalue.TaggedOutput('errors', result) >> >> def __init__(self, pardo, save_exceptions=False, save_stacktraces=False): >> self.fn = pardo.fn >> self.save_exceptions = save_exceptions >> self.save_stacktraces = save_stacktraces >> >> def expand(self, pcoll): >> dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions, >> self.save_stacktraces) >> return pcoll.pipeline | beam.ParDo(dofn) >> >> >> >> However, when I try to use it with a TestPipeline() object, I get the >> following exception: >> >> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ >> _ _ _ >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in >> __exit__ >> self.run().wait_until_finish() >> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96: >> in run >> result = super(TestPipeline, self).run() >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run >> self.to_runner_api(), self.runner, self._options).run(False) >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in >> to_runner_api >> root_transform_id = context.transforms.get_id(self._root_transform()) >> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58: >> in get_id >> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in >> to_runner_api >> subtransforms=[context.transforms.get_id(part) for part in >> self.parts], >> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58: >> in get_id >> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in >> to_runner_api >> subtransforms=[context.transforms.get_id(part) for part in >> self.parts], >> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58: >> in get_id >> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in >> to_runner_api >> for tag, out in self.named_outputs().items()}, >> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in >> <dictcomp> >> outputs={str(tag): context.pcollections.get_id(out) >> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58: >> in get_id >> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) >> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in >> to_runner_api >> self.windowing)) >> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in >> windowing >> self.producer.inputs) >> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ >> _ _ _ >> >> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590> >> inputs = (<PBegin[None.None] at 0x10b7504d0>,) >> >> def get_windowing(self, inputs): >> """Returns the window function to be associated with transform's >> output. >> >> By default most transforms just return the windowing function >> associated >> with the input PCollection (or the first input if several). >> """ >> # TODO(robertwb): Assert all input WindowFns compatible. >> > return inputs[0].windowing >> E AttributeError: 'PBegin' object has no attribute 'windowing' >> >> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343: >> AttributeError >> >> The test code goes like this: >> >> class ErrorSieveTest(unittest.TestCase): >> def test_with_data_only(self): >> with TestPipeline() as p: >> results = (p >> | beam.Create([1, 2, "3", 4]) >> | ErrorSieve(beam.Map(lambda x: x + 1)) >> ) >> >> >> I don't know much about how windowing works under the hood, so will have >> to do some digging. >> >> In the meantime, would appreciate any insight into this. >> >> Thanks! >> >> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <[email protected]> >> wrote: >> >>> Rather than mutating the DoFn itself, I would create a new ParDo that >>> wraps and invokes the inner one, e.g. >>> >>> class ErrorSieveParDo(beam.DoFn): >>> def __init__(self, dofn): >>> self._dofn = dofn >>> def process(self, *args, **kwargs): >>> # note that raw_result is an *iterable* or None >>> raw_result = self._dofn.process(*args, **kwargs) >>> ... >>> # don't forget start/finish_bundle if needed >>> >>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn())) >>> >>> It's correct that you can't use this in Map/FlatMap. Alternatively, make >>> a transform >>> >>> class ErrorSieve(PTransform): >>> def __init__(self, pardo): >>> self.fn = pardo.fn >>> def expand(self, input_pcoll): >>> return input_pcoll | ParDo(ErrorSieveParDo(self.fn)) >>> >>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn()) >>> >>> Even better, however, might be to do something like >>> >>> class ErrorSieve(PTransform): >>> def __init__(self, pardo): >>> self.fn = pardo.fn >>> def expand(self, pcoll_input): >>> return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn) >>> >>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done >>> elementwise). >>> >>> >>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <[email protected] >>> > wrote: >>> >>>> Hi Sourabh, >>>> >>>> Great call, thanks. I was thinking about a slightly different >>>> interface, but this is exactly the direction I wanted to go. So, my >>>> approach, I guess, would be something like that: >>>> >>>> class ErrorSieve(PTransform): >>>> def __init__(self, pardo): >>>> self.fn = pardo.fn >>>> def expand(self): >>>> inner_process = self.fn.process >>>> def process(self, *args, **kwargs): >>>> raw_result = inner_process(*args, **kwargs) >>>> result = ... >>>> yield result >>>> setattr(fn, 'process', process) >>>> return ParDo(self.fn) >>>> >>>> >>>> >>>> I'll share a working snippet once it's done. >>>> >>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected] >>>> > wrote: >>>> >>>>> Hi, >>>>> >>>>> Is it possible to create >>>>> >>>>> class ErrorSieve(PTransform): >>>>> def __init__ (dofn): >>>>> def expand(): >>>>> return ParDo(modifiedDoFn) >>>>> >>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you >>>>> don't expose the ParDo to the user. >>>>> >>>>> Will this work for your usecase? >>>>> >>>>> -Sourabh >>>>> >>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi list, >>>>>> >>>>>> I'm trying to make a transformation function (let's call it >>>>>> ErrorSieve) that would take a ParDo object as input and modify its >>>>>> underlying DoFn object, basically adding extra logic on top of an >>>>>> underlying process() method. >>>>>> >>>>>> Ideally for me, the example usage would be: >>>>>> >>>>>> ```python >>>>>> p | ErrorSieve(beam.ParDo(MyDoFn()) >>>>>> >>>>>> or >>>>>> >>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1)) >>>>>> ``` >>>>>> >>>>>> However, this would require me to butcher the internals of ParDo >>>>>> mechanisms, especially since ParDo's make_fn() method gets called during >>>>>> its transformation. My other thinking was to make it a fair and square >>>>>> DoFn: >>>>>> >>>>>> ```python >>>>>> p | beam.ParDo(ErrorSieve(MyDoFn()) >>>>>> ``` >>>>>> >>>>>> The only problem with this is that I can't use it with transforms >>>>>> like FlatMap, which is a bit unfortunate. >>>>>> >>>>>> Do you think it's worth investigating how to implement the first >>>>>> approach, or should I just instead settle with the second approach, using >>>>>> only custom DoFns? >>>>>> >>>>>> Thank you. >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Dmitry Demeshchuk. >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best regards, >>>> Dmitry Demeshchuk. >>>> >>> >>> >> >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > > -- Best regards, Dmitry Demeshchuk.
