Ooos, that was dumb. I'm starting to see value in using typed languages.

Thanks a lot, Robert!

On Fri, Jul 21, 2017 at 5:09 PM, Robert Bradshaw <[email protected]>
wrote:

> The line
>
>         return pcoll.pipeline | beam.ParDo(dofn)
>
> should read
>
>         return pcoll| beam.ParDo(dofn)
>
>
> On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <[email protected]>
> wrote:
>
>> Yep, that's what I ended up doing.
>>
>> Here's the full code:
>>
>> class ErrorSieve(beam.PTransform):
>>     class ErrorSieveDoFn(beam.DoFn):
>>         def __init__(self, dofn, save_exceptions=False, 
>> save_stacktraces=False):
>>             self.dofn = dofn
>>             self.save_exceptions = save_exceptions
>>             self.save_stacktraces = save_stacktraces
>>
>>         def process(self, *args, **kwargs):
>>             try:
>>                 yield inner_process(*args, **kwargs)
>>             except Exception as e:
>>                 result = [args, kwargs]
>>                 if self.save_exceptions:
>>                     result = result + [e]
>>                 if self.save_stacktraces:
>>                     result = result + [traceback.extract_stack()]
>>                 yield beam.pvalue.TaggedOutput('errors', result)
>>
>>     def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
>>         self.fn = pardo.fn
>>         self.save_exceptions = save_exceptions
>>         self.save_stacktraces = save_stacktraces
>>
>>     def expand(self, pcoll):
>>         dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions, 
>> self.save_stacktraces)
>>         return pcoll.pipeline | beam.ParDo(dofn)
>>
>> ​
>>
>> However, when I try to use it with a TestPipeline() object, I get the
>> following exception:
>>
>> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>> _ _ _
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in
>> __exit__
>>     self.run().wait_until_finish()
>> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
>> in run
>>     result = super(TestPipeline, self).run()
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
>>     self.to_runner_api(), self.runner, self._options).run(False)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
>> to_runner_api
>>     root_transform_id = context.transforms.get_id(self._root_transform())
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
>> to_runner_api
>>     subtransforms=[context.transforms.get_id(part) for part in
>> self.parts],
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
>> to_runner_api
>>     subtransforms=[context.transforms.get_id(part) for part in
>> self.parts],
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
>> to_runner_api
>>     for tag, out in self.named_outputs().items()},
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
>> <dictcomp>
>>     outputs={str(tag): context.pcollections.get_id(out)
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
>> to_runner_api
>>     self.windowing))
>> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in
>> windowing
>>     self.producer.inputs)
>> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>> _ _ _
>>
>> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
>> inputs = (<PBegin[None.None] at 0x10b7504d0>,)
>>
>>     def get_windowing(self, inputs):
>>       """Returns the window function to be associated with transform's
>> output.
>>
>>         By default most transforms just return the windowing function
>> associated
>>         with the input PCollection (or the first input if several).
>>         """
>>       # TODO(robertwb): Assert all input WindowFns compatible.
>> >     return inputs[0].windowing
>> E     AttributeError: 'PBegin' object has no attribute 'windowing'
>>
>> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
>> AttributeError
>>
>> The test code goes like this:
>>
>> class ErrorSieveTest(unittest.TestCase):
>>     def test_with_data_only(self):
>>         with TestPipeline() as p:
>>             results = (p
>>                     | beam.Create([1, 2, "3", 4])
>>                     | ErrorSieve(beam.Map(lambda x: x + 1))
>>             )
>>
>> ​
>> I don't know much about how windowing works under the hood, so will have
>> to do some digging.
>>
>> In the meantime, would appreciate any insight into this.
>>
>> Thanks!
>>
>> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> Rather than mutating the DoFn itself, I would create a new ParDo that
>>> wraps and invokes the inner one, e.g.
>>>
>>> class ErrorSieveParDo(beam.DoFn):
>>>   def __init__(self, dofn):
>>>     self._dofn = dofn
>>>   def process(self, *args, **kwargs):
>>>       # note that raw_result is an *iterable* or None
>>>       raw_result = self._dofn.process(*args, **kwargs)
>>>       ...
>>>   # don't forget start/finish_bundle if needed
>>>
>>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>>>
>>> It's correct that you can't use this in Map/FlatMap. Alternatively, make
>>> a transform
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self, input_pcoll):
>>>     return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>>>
>>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>>>
>>> Even better, however, might be to do something like
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self, pcoll_input):
>>>     return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>>>
>>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
>>> elementwise).
>>>
>>>
>>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <[email protected]
>>> > wrote:
>>>
>>>> Hi Sourabh,
>>>>
>>>> Great call, thanks. I was thinking about a slightly different
>>>> interface, but this is exactly the direction I wanted to go. So, my
>>>> approach, I guess, would be something like that:
>>>>
>>>> class ErrorSieve(PTransform):
>>>>   def __init__(self, pardo):
>>>>     self.fn = pardo.fn
>>>>   def expand(self):
>>>>     inner_process = self.fn.process
>>>>     def process(self, *args, **kwargs):
>>>>       raw_result = inner_process(*args, **kwargs)
>>>>       result = ...
>>>>       yield result
>>>>     setattr(fn, 'process', process)
>>>>     return ParDo(self.fn)
>>>>
>>>> ​
>>>>
>>>> I'll share a working snippet once it's done.
>>>>
>>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is it possible to create
>>>>>
>>>>> class ErrorSieve(PTransform):
>>>>>    def __init__ (dofn):
>>>>>    def expand():
>>>>>          return ParDo(modifiedDoFn)
>>>>>
>>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>>> don't expose the ParDo to the user.
>>>>>
>>>>> Will this work for your usecase?
>>>>>
>>>>> -Sourabh
>>>>>
>>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi list,
>>>>>>
>>>>>> I'm trying to make a transformation function (let's call it
>>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>>> underlying process() method.
>>>>>>
>>>>>> Ideally for me, the example usage would be:
>>>>>>
>>>>>> ```python
>>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>>
>>>>>> or
>>>>>>
>>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>>> ```
>>>>>>
>>>>>> However, this would require me to butcher the internals of ParDo
>>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>>> its transformation. My other thinking was to make it a fair and square 
>>>>>> DoFn:
>>>>>>
>>>>>> ```python
>>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>>> ```
>>>>>>
>>>>>> The only problem with this is that I can't use it with transforms
>>>>>> like FlatMap, which is a bit unfortunate.
>>>>>>
>>>>>> Do you think it's worth investigating how to implement the first
>>>>>> approach, or should I just instead settle with the second approach, using
>>>>>> only custom DoFns?
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Dmitry Demeshchuk.
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>
>


-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to