Got it. Thanks

On Fri, Jul 21, 2017 at 3:04 PM Dmitry Demeshchuk <[email protected]>
wrote:

> The motivation of using ParDo is the following. What if I had code like
> this:
>
> ```
> p | beam.FlatMap(lambda x: x + 1)
> ```
>
> and wanted to have ErrorSieve on top of it? There's no explicitly defined
> DoFn in that expression, after all.
>
>
>
> On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <[email protected]>
> wrote:
>
>> Thanks, just one thing I'm not sure about is why do you need to pass the
>> ParDo to the ErrorSieve instead of just passing the DoFn as you're only
>> modifying the DoFn.
>>
>> On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <[email protected]>
>> wrote:
>>
>>> Hi Sourabh,
>>>
>>> Great call, thanks. I was thinking about a slightly different interface,
>>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>>> would be something like that:
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self):
>>>     inner_process = self.fn.process
>>>     def process(self, *args, **kwargs):
>>>       raw_result = inner_process(*args, **kwargs)
>>>       result = ...
>>>       yield result
>>>     setattr(fn, 'process', process)
>>>     return ParDo(self.fn)
>>>
>>> ​
>>>
>>> I'll share a working snippet once it's done.
>>>
>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to create
>>>>
>>>> class ErrorSieve(PTransform):
>>>>    def __init__ (dofn):
>>>>    def expand():
>>>>          return ParDo(modifiedDoFn)
>>>>
>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>> don't expose the ParDo to the user.
>>>>
>>>> Will this work for your usecase?
>>>>
>>>> -Sourabh
>>>>
>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to make a transformation function (let's call it
>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>> underlying process() method.
>>>>>
>>>>> Ideally for me, the example usage would be:
>>>>>
>>>>> ```python
>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>
>>>>> or
>>>>>
>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>> ```
>>>>>
>>>>> However, this would require me to butcher the internals of ParDo
>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>> its transformation. My other thinking was to make it a fair and square 
>>>>> DoFn:
>>>>>
>>>>> ```python
>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>> ```
>>>>>
>>>>> The only problem with this is that I can't use it with transforms like
>>>>> FlatMap, which is a bit unfortunate.
>>>>>
>>>>> Do you think it's worth investigating how to implement the first
>>>>> approach, or should I just instead settle with the second approach, using
>>>>> only custom DoFns?
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Reply via email to