The line

        return pcoll.pipeline | beam.ParDo(dofn)

should read

        return pcoll| beam.ParDo(dofn)


On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <[email protected]>
wrote:

> Yep, that's what I ended up doing.
>
> Here's the full code:
>
> class ErrorSieve(beam.PTransform):
>     class ErrorSieveDoFn(beam.DoFn):
>         def __init__(self, dofn, save_exceptions=False, 
> save_stacktraces=False):
>             self.dofn = dofn
>             self.save_exceptions = save_exceptions
>             self.save_stacktraces = save_stacktraces
>
>         def process(self, *args, **kwargs):
>             try:
>                 yield inner_process(*args, **kwargs)
>             except Exception as e:
>                 result = [args, kwargs]
>                 if self.save_exceptions:
>                     result = result + [e]
>                 if self.save_stacktraces:
>                     result = result + [traceback.extract_stack()]
>                 yield beam.pvalue.TaggedOutput('errors', result)
>
>     def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
>         self.fn = pardo.fn
>         self.save_exceptions = save_exceptions
>         self.save_stacktraces = save_stacktraces
>
>     def expand(self, pcoll):
>         dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions, 
> self.save_stacktraces)
>         return pcoll.pipeline | beam.ParDo(dofn)
>
> ​
>
> However, when I try to use it with a TestPipeline() object, I get the
> following exception:
>
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in
> __exit__
>     self.run().wait_until_finish()
> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
> in run
>     result = super(TestPipeline, self).run()
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
>     self.to_runner_api(), self.runner, self._options).run(False)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
> to_runner_api
>     root_transform_id = context.transforms.get_id(self._root_transform())
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
>     subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
>     subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
> to_runner_api
>     for tag, out in self.named_outputs().items()},
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
> <dictcomp>
>     outputs={str(tag): context.pcollections.get_id(out)
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
> to_runner_api
>     self.windowing))
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in
> windowing
>     self.producer.inputs)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
>
> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
> inputs = (<PBegin[None.None] at 0x10b7504d0>,)
>
>     def get_windowing(self, inputs):
>       """Returns the window function to be associated with transform's
> output.
>
>         By default most transforms just return the windowing function
> associated
>         with the input PCollection (or the first input if several).
>         """
>       # TODO(robertwb): Assert all input WindowFns compatible.
> >     return inputs[0].windowing
> E     AttributeError: 'PBegin' object has no attribute 'windowing'
>
> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
> AttributeError
>
> The test code goes like this:
>
> class ErrorSieveTest(unittest.TestCase):
>     def test_with_data_only(self):
>         with TestPipeline() as p:
>             results = (p
>                     | beam.Create([1, 2, "3", 4])
>                     | ErrorSieve(beam.Map(lambda x: x + 1))
>             )
>
> ​
> I don't know much about how windowing works under the hood, so will have
> to do some digging.
>
> In the meantime, would appreciate any insight into this.
>
> Thanks!
>
> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <[email protected]>
> wrote:
>
>> Rather than mutating the DoFn itself, I would create a new ParDo that
>> wraps and invokes the inner one, e.g.
>>
>> class ErrorSieveParDo(beam.DoFn):
>>   def __init__(self, dofn):
>>     self._dofn = dofn
>>   def process(self, *args, **kwargs):
>>       # note that raw_result is an *iterable* or None
>>       raw_result = self._dofn.process(*args, **kwargs)
>>       ...
>>   # don't forget start/finish_bundle if needed
>>
>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>>
>> It's correct that you can't use this in Map/FlatMap. Alternatively, make
>> a transform
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self, input_pcoll):
>>     return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>>
>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>>
>> Even better, however, might be to do something like
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self, pcoll_input):
>>     return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>>
>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
>> elementwise).
>>
>>
>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <[email protected]>
>> wrote:
>>
>>> Hi Sourabh,
>>>
>>> Great call, thanks. I was thinking about a slightly different interface,
>>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>>> would be something like that:
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self):
>>>     inner_process = self.fn.process
>>>     def process(self, *args, **kwargs):
>>>       raw_result = inner_process(*args, **kwargs)
>>>       result = ...
>>>       yield result
>>>     setattr(fn, 'process', process)
>>>     return ParDo(self.fn)
>>>
>>> ​
>>>
>>> I'll share a working snippet once it's done.
>>>
>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to create
>>>>
>>>> class ErrorSieve(PTransform):
>>>>    def __init__ (dofn):
>>>>    def expand():
>>>>          return ParDo(modifiedDoFn)
>>>>
>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>> don't expose the ParDo to the user.
>>>>
>>>> Will this work for your usecase?
>>>>
>>>> -Sourabh
>>>>
>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to make a transformation function (let's call it
>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>> underlying process() method.
>>>>>
>>>>> Ideally for me, the example usage would be:
>>>>>
>>>>> ```python
>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>
>>>>> or
>>>>>
>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>> ```
>>>>>
>>>>> However, this would require me to butcher the internals of ParDo
>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>> its transformation. My other thinking was to make it a fair and square 
>>>>> DoFn:
>>>>>
>>>>> ```python
>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>> ```
>>>>>
>>>>> The only problem with this is that I can't use it with transforms like
>>>>> FlatMap, which is a bit unfortunate.
>>>>>
>>>>> Do you think it's worth investigating how to implement the first
>>>>> approach, or should I just instead settle with the second approach, using
>>>>> only custom DoFns?
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Reply via email to