The motivation of using ParDo is the following. What if I had code like
this:

```
p | beam.FlatMap(lambda x: x + 1)
```

and wanted to have ErrorSieve on top of it? There's no explicitly defined
DoFn in that expression, after all.



On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <[email protected]>
wrote:

> Thanks, just one thing I'm not sure about is why do you need to pass the
> ParDo to the ErrorSieve instead of just passing the DoFn as you're only
> modifying the DoFn.
>
> On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <[email protected]>
> wrote:
>
>> Hi Sourabh,
>>
>> Great call, thanks. I was thinking about a slightly different interface,
>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>> would be something like that:
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self):
>>     inner_process = self.fn.process
>>     def process(self, *args, **kwargs):
>>       raw_result = inner_process(*args, **kwargs)
>>       result = ...
>>       yield result
>>     setattr(fn, 'process', process)
>>     return ParDo(self.fn)
>>
>> ​
>>
>> I'll share a working snippet once it's done.
>>
>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to create
>>>
>>> class ErrorSieve(PTransform):
>>>    def __init__ (dofn):
>>>    def expand():
>>>          return ParDo(modifiedDoFn)
>>>
>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>> don't expose the ParDo to the user.
>>>
>>> Will this work for your usecase?
>>>
>>> -Sourabh
>>>
>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]>
>>> wrote:
>>>
>>>> Hi list,
>>>>
>>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>>> that would take a ParDo object as input and modify its underlying DoFn
>>>> object, basically adding extra logic on top of an underlying process()
>>>> method.
>>>>
>>>> Ideally for me, the example usage would be:
>>>>
>>>> ```python
>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>
>>>> or
>>>>
>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>> ```
>>>>
>>>> However, this would require me to butcher the internals of ParDo
>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>> its transformation. My other thinking was to make it a fair and square 
>>>> DoFn:
>>>>
>>>> ```python
>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>> ```
>>>>
>>>> The only problem with this is that I can't use it with transforms like
>>>> FlatMap, which is a bit unfortunate.
>>>>
>>>> Do you think it's worth investigating how to implement the first
>>>> approach, or should I just instead settle with the second approach, using
>>>> only custom DoFns?
>>>>
>>>> Thank you.
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>


-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to