Got it. Thanks On Fri, Jul 21, 2017 at 3:04 PM Dmitry Demeshchuk <[email protected]> wrote:
> The motivation of using ParDo is the following. What if I had code like > this: > > ``` > p | beam.FlatMap(lambda x: x + 1) > ``` > > and wanted to have ErrorSieve on top of it? There's no explicitly defined > DoFn in that expression, after all. > > > > On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <[email protected]> > wrote: > >> Thanks, just one thing I'm not sure about is why do you need to pass the >> ParDo to the ErrorSieve instead of just passing the DoFn as you're only >> modifying the DoFn. >> >> On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <[email protected]> >> wrote: >> >>> Hi Sourabh, >>> >>> Great call, thanks. I was thinking about a slightly different interface, >>> but this is exactly the direction I wanted to go. So, my approach, I guess, >>> would be something like that: >>> >>> class ErrorSieve(PTransform): >>> def __init__(self, pardo): >>> self.fn = pardo.fn >>> def expand(self): >>> inner_process = self.fn.process >>> def process(self, *args, **kwargs): >>> raw_result = inner_process(*args, **kwargs) >>> result = ... >>> yield result >>> setattr(fn, 'process', process) >>> return ParDo(self.fn) >>> >>> >>> >>> I'll share a working snippet once it's done. >>> >>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]> >>> wrote: >>> >>>> Hi, >>>> >>>> Is it possible to create >>>> >>>> class ErrorSieve(PTransform): >>>> def __init__ (dofn): >>>> def expand(): >>>> return ParDo(modifiedDoFn) >>>> >>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you >>>> don't expose the ParDo to the user. >>>> >>>> Will this work for your usecase? >>>> >>>> -Sourabh >>>> >>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]> >>>> wrote: >>>> >>>>> Hi list, >>>>> >>>>> I'm trying to make a transformation function (let's call it >>>>> ErrorSieve) that would take a ParDo object as input and modify its >>>>> underlying DoFn object, basically adding extra logic on top of an >>>>> underlying process() method. >>>>> >>>>> Ideally for me, the example usage would be: >>>>> >>>>> ```python >>>>> p | ErrorSieve(beam.ParDo(MyDoFn()) >>>>> >>>>> or >>>>> >>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1)) >>>>> ``` >>>>> >>>>> However, this would require me to butcher the internals of ParDo >>>>> mechanisms, especially since ParDo's make_fn() method gets called during >>>>> its transformation. My other thinking was to make it a fair and square >>>>> DoFn: >>>>> >>>>> ```python >>>>> p | beam.ParDo(ErrorSieve(MyDoFn()) >>>>> ``` >>>>> >>>>> The only problem with this is that I can't use it with transforms like >>>>> FlatMap, which is a bit unfortunate. >>>>> >>>>> Do you think it's worth investigating how to implement the first >>>>> approach, or should I just instead settle with the second approach, using >>>>> only custom DoFns? >>>>> >>>>> Thank you. >>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Dmitry Demeshchuk. >>>>> >>>> >>> >>> >>> -- >>> Best regards, >>> Dmitry Demeshchuk. >>> >> > > > -- > Best regards, > Dmitry Demeshchuk. >
