The line
return pcoll.pipeline | beam.ParDo(dofn)
should read
return pcoll| beam.ParDo(dofn)
On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <[email protected]>
wrote:
> Yep, that's what I ended up doing.
>
> Here's the full code:
>
> class ErrorSieve(beam.PTransform):
> class ErrorSieveDoFn(beam.DoFn):
> def __init__(self, dofn, save_exceptions=False,
> save_stacktraces=False):
> self.dofn = dofn
> self.save_exceptions = save_exceptions
> self.save_stacktraces = save_stacktraces
>
> def process(self, *args, **kwargs):
> try:
> yield inner_process(*args, **kwargs)
> except Exception as e:
> result = [args, kwargs]
> if self.save_exceptions:
> result = result + [e]
> if self.save_stacktraces:
> result = result + [traceback.extract_stack()]
> yield beam.pvalue.TaggedOutput('errors', result)
>
> def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
> self.fn = pardo.fn
> self.save_exceptions = save_exceptions
> self.save_stacktraces = save_stacktraces
>
> def expand(self, pcoll):
> dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions,
> self.save_stacktraces)
> return pcoll.pipeline | beam.ParDo(dofn)
>
>
>
> However, when I try to use it with a TestPipeline() object, I get the
> following exception:
>
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in
> __exit__
> self.run().wait_until_finish()
> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
> in run
> result = super(TestPipeline, self).run()
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
> self.to_runner_api(), self.runner, self._options).run(False)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
> to_runner_api
> root_transform_id = context.transforms.get_id(self._root_transform())
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
> subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
> subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
> to_runner_api
> for tag, out in self.named_outputs().items()},
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
> <dictcomp>
> outputs={str(tag): context.pcollections.get_id(out)
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
> self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
> to_runner_api
> self.windowing))
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in
> windowing
> self.producer.inputs)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
>
> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
> inputs = (<PBegin[None.None] at 0x10b7504d0>,)
>
> def get_windowing(self, inputs):
> """Returns the window function to be associated with transform's
> output.
>
> By default most transforms just return the windowing function
> associated
> with the input PCollection (or the first input if several).
> """
> # TODO(robertwb): Assert all input WindowFns compatible.
> > return inputs[0].windowing
> E AttributeError: 'PBegin' object has no attribute 'windowing'
>
> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
> AttributeError
>
> The test code goes like this:
>
> class ErrorSieveTest(unittest.TestCase):
> def test_with_data_only(self):
> with TestPipeline() as p:
> results = (p
> | beam.Create([1, 2, "3", 4])
> | ErrorSieve(beam.Map(lambda x: x + 1))
> )
>
>
> I don't know much about how windowing works under the hood, so will have
> to do some digging.
>
> In the meantime, would appreciate any insight into this.
>
> Thanks!
>
> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <[email protected]>
> wrote:
>
>> Rather than mutating the DoFn itself, I would create a new ParDo that
>> wraps and invokes the inner one, e.g.
>>
>> class ErrorSieveParDo(beam.DoFn):
>> def __init__(self, dofn):
>> self._dofn = dofn
>> def process(self, *args, **kwargs):
>> # note that raw_result is an *iterable* or None
>> raw_result = self._dofn.process(*args, **kwargs)
>> ...
>> # don't forget start/finish_bundle if needed
>>
>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>>
>> It's correct that you can't use this in Map/FlatMap. Alternatively, make
>> a transform
>>
>> class ErrorSieve(PTransform):
>> def __init__(self, pardo):
>> self.fn = pardo.fn
>> def expand(self, input_pcoll):
>> return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>>
>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>>
>> Even better, however, might be to do something like
>>
>> class ErrorSieve(PTransform):
>> def __init__(self, pardo):
>> self.fn = pardo.fn
>> def expand(self, pcoll_input):
>> return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>>
>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
>> elementwise).
>>
>>
>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <[email protected]>
>> wrote:
>>
>>> Hi Sourabh,
>>>
>>> Great call, thanks. I was thinking about a slightly different interface,
>>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>>> would be something like that:
>>>
>>> class ErrorSieve(PTransform):
>>> def __init__(self, pardo):
>>> self.fn = pardo.fn
>>> def expand(self):
>>> inner_process = self.fn.process
>>> def process(self, *args, **kwargs):
>>> raw_result = inner_process(*args, **kwargs)
>>> result = ...
>>> yield result
>>> setattr(fn, 'process', process)
>>> return ParDo(self.fn)
>>>
>>>
>>>
>>> I'll share a working snippet once it's done.
>>>
>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to create
>>>>
>>>> class ErrorSieve(PTransform):
>>>> def __init__ (dofn):
>>>> def expand():
>>>> return ParDo(modifiedDoFn)
>>>>
>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>> don't expose the ParDo to the user.
>>>>
>>>> Will this work for your usecase?
>>>>
>>>> -Sourabh
>>>>
>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to make a transformation function (let's call it
>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>> underlying process() method.
>>>>>
>>>>> Ideally for me, the example usage would be:
>>>>>
>>>>> ```python
>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>
>>>>> or
>>>>>
>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>> ```
>>>>>
>>>>> However, this would require me to butcher the internals of ParDo
>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>> its transformation. My other thinking was to make it a fair and square
>>>>> DoFn:
>>>>>
>>>>> ```python
>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>> ```
>>>>>
>>>>> The only problem with this is that I can't use it with transforms like
>>>>> FlatMap, which is a bit unfortunate.
>>>>>
>>>>> Do you think it's worth investigating how to implement the first
>>>>> approach, or should I just instead settle with the second approach, using
>>>>> only custom DoFns?
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>