The motivation of using ParDo is the following. What if I had code like this:
``` p | beam.FlatMap(lambda x: x + 1) ``` and wanted to have ErrorSieve on top of it? There's no explicitly defined DoFn in that expression, after all. On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <[email protected]> wrote: > Thanks, just one thing I'm not sure about is why do you need to pass the > ParDo to the ErrorSieve instead of just passing the DoFn as you're only > modifying the DoFn. > > On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <[email protected]> > wrote: > >> Hi Sourabh, >> >> Great call, thanks. I was thinking about a slightly different interface, >> but this is exactly the direction I wanted to go. So, my approach, I guess, >> would be something like that: >> >> class ErrorSieve(PTransform): >> def __init__(self, pardo): >> self.fn = pardo.fn >> def expand(self): >> inner_process = self.fn.process >> def process(self, *args, **kwargs): >> raw_result = inner_process(*args, **kwargs) >> result = ... >> yield result >> setattr(fn, 'process', process) >> return ParDo(self.fn) >> >> >> >> I'll share a working snippet once it's done. >> >> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <[email protected]> >> wrote: >> >>> Hi, >>> >>> Is it possible to create >>> >>> class ErrorSieve(PTransform): >>> def __init__ (dofn): >>> def expand(): >>> return ParDo(modifiedDoFn) >>> >>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you >>> don't expose the ParDo to the user. >>> >>> Will this work for your usecase? >>> >>> -Sourabh >>> >>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <[email protected]> >>> wrote: >>> >>>> Hi list, >>>> >>>> I'm trying to make a transformation function (let's call it ErrorSieve) >>>> that would take a ParDo object as input and modify its underlying DoFn >>>> object, basically adding extra logic on top of an underlying process() >>>> method. >>>> >>>> Ideally for me, the example usage would be: >>>> >>>> ```python >>>> p | ErrorSieve(beam.ParDo(MyDoFn()) >>>> >>>> or >>>> >>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1)) >>>> ``` >>>> >>>> However, this would require me to butcher the internals of ParDo >>>> mechanisms, especially since ParDo's make_fn() method gets called during >>>> its transformation. My other thinking was to make it a fair and square >>>> DoFn: >>>> >>>> ```python >>>> p | beam.ParDo(ErrorSieve(MyDoFn()) >>>> ``` >>>> >>>> The only problem with this is that I can't use it with transforms like >>>> FlatMap, which is a bit unfortunate. >>>> >>>> Do you think it's worth investigating how to implement the first >>>> approach, or should I just instead settle with the second approach, using >>>> only custom DoFns? >>>> >>>> Thank you. >>>> >>>> >>>> -- >>>> Best regards, >>>> Dmitry Demeshchuk. >>>> >>> >> >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > -- Best regards, Dmitry Demeshchuk.
