On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com> wrote:
> Working on this on and off now and getting some pretty good traction. > > One thing I'm a little worried about is all the classes that are marked > "internal use only". A lot of these seem either very useful or possibly > critical to writing a runner. How strictly should I interpret these private > implementation labels? > > A few bits that I'm interested in using ordered by how surprised I was to > find that they're internal only. > > - apache_bean.pipeline.AppliedPTransform > - apache_beam.pipeline.PipelineVisitor > - apache_beam.runners.common.DoFnRunner > The public API is the protos. You should not have to interact with AppliedPTransform and PipelineVisitor directly (and while you can reach in and do so, there are no promises here and these are subject to change). As for DoFnRunner, if you're trying to reach in at this level you're probably going to have to be replicating a bunch of surrounding infrastructure as well. I would recommend using a BundleProcessor [1] to coordinate the work (which will internally wire up the chain of DoFns correctly and take them through their proper lifecycle). As mentioned above, you can directly borrow the translations in fn_api_runner to go from a full Pipeline graph (proto) to a set of fused DoFns to execute in topological order (as ProcessBundleDescriptor protos, which is what BundleProcessor accepts). [1] https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851 > Thanks again for the help, > Joey > > On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> Another advantage of a portable runner would be that it will be using >> well defined and backwards compatible Beam portable APIs to communicate >> with SDKs. I think this is specially important for runners that do not live >> in the Beam repo since otherwise future SDK releases could break your >> runner in subtle ways. Also portability gives you more flexibility when it >> comes to choosing an SDK to define the pipeline and will allow you to >> execute transforms in any SDK via cross-language. >> >> Thanks, >> Cham >> >> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user < >> user@beam.apache.org> wrote: >> >>> >>> >>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Totally doable by one person, especially given the limited feature set >>>>> you mention above. >>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>> is >>>>> a good starting point as to what the relationship between a Runner and the >>>>> SDK is at a level of detail sufficient for implementation (told from the >>>>> perspective of an SDK, but the story is largely about the interface which >>>>> is directly applicable). >>>> >>>> >>>> Great slides, I really appreciate the illustrations. >>>> >>>> I hadn't realized there was a concept of an "SDK Worker", I had >>>> imagined that once the Runner started execution of a workflow, it was >>>> Runner all the way down. Is the Fn API the only way to implement a runner? >>>> Our execution environment is a bit constrained in such a way that we can't >>>> expose the APIs required to implement the Fn API. To be forthright, we >>>> basically only have the ability to start a worker either with a known >>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with >>>> a bundle of data to process and return the outputs for. We're constrained >>>> from really any additional communication with a worker beyond that. >>>> >>> >>> The "worker" abstraction gives the ability to wrap any user code in a >>> way that it can be called from any runner. If you're willing to constrain >>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can >>> be a logical, rather than physical, concept. >>> >>> Another way to look at it is that in practice, the "runner" often has >>> its own notion of "workers" which wrap (often in a 1:1 way) the logical >>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be >>> inlined (e.g. if it's 100% Python on both sides). See, for example, >>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >>> >>> >>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Thanks all for the responses! >>>>>> >>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>> - Am I going to implement a portable runner or native one? >>>>>>> >>>>>> >>>>>> Portable sounds great, but the answer depends on how much additional >>>>>> cost it'd require to implement portable over non-portable, even >>>>>> considering >>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not >>>>>> familiar enough to know what the additional cost is so I don't have a >>>>>> firm >>>>>> answer. >>>>>> >>>>> >>>>> I would way it would not be that expensive to write it in a "portable >>>>> compatible" way (i.e it uses the publicly-documented protocol as the >>>>> interface rather than reaching into internal details) even if it doesn't >>>>> use GRCP and fire up separate processes/docker images for the workers >>>>> (preferring to do tall of that inline like the Python portable direct >>>>> runner does). >>>>> >>>>> >>>>>> - Which SDK I should use for this runner? >>>>>>> >>>>>> I'd be developing this runner against the python SDK and if the >>>>>> runner only worked with the python SDK that'd be okay in the short term >>>>>> >>>>> >>>>> Yes. And if you do it the above way, it should be easy to extend (or >>>>> not) if/when the need arises. >>>>> >>>>> >>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>> >>>>>> Likely won't be contributed back to Beam (not sure if it'd actually >>>>>> be useful to a wide audience anyways). >>>>>> >>>>>> The context is we've been developing an in-house large-scale pipeline >>>>>> framework that encapsulates both the programming model and the >>>>>> runner/execution of data workflows. As it's grown, I keep finding myself >>>>>> just reimplementing features and abstractions Beam has already >>>>>> implemented, >>>>>> so I wanted to explore adopting Beam. Our execution environment is very >>>>>> particular though and our workflows require it (due to the way we license >>>>>> our software), so my plan was to try to create a very basic runner that >>>>>> uses our execution environment. The runner could have very few features >>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd >>>>>> probably >>>>>> introduce a shim for some of our internally implemented transforms and >>>>>> assess from there. >>>>>> >>>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>>> thoughts as to whether this seems reasonable and achievable without a >>>>>> large >>>>>> concerted effort or even if the general idea makes any sense. (I >>>>>> recognize >>>>>> that it might not be *easy*, but I don't have the resources to >>>>>> dedicate more than myself to work on a PoC) >>>>>> >>>>> >>>>> Totally doable by one person, especially given the limited feature set >>>>> you mention above. >>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>> is a good starting point as to what the relationship between a Runner and >>>>> the SDK is at a level of detail sufficient for implementation (told from >>>>> the perspective of an SDK, but the story is largely about the interface >>>>> which is directly applicable). >>>>> >>>>> Given the limited feature set you proposed, this is similar to the >>>>> original Python portable runner which took a week or two to put together >>>>> (granted a lot has been added since then), or the typescript direct runner >>>>> ( >>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>>> ) which was done (in its basic form, no support for side inputs and such) >>>>> in less than a week. Granted, as these are local runners, this illustrates >>>>> only the Beam-side complexity of things (not the work of actually >>>>> implementing a distributed shuffle, starting and assigning work to >>>>> multiple >>>>> workers, etc. but presumably that's the kind of thing your execution >>>>> environment already takes care of. >>>>> >>>>> As for some more concrete pointers, you could probably leverage a lot >>>>> of what's there by invoking create_stages >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>>> >>>>> which will do optimization, fusion, etc. and then implementing your >>>>> own version of run_stages >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>>> >>>>> to execute these in topological order on your compute infrastructure. >>>>> (If you're not doing streaming, this is much more straightforward than all >>>>> the bundler scheduler stuff that currently exists in that code). >>>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko < >>>>>> aromanenko....@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user < >>>>>>> user@beam.apache.org> wrote: >>>>>>> >>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko < >>>>>>> aromanenko....@gmail.com> wrote: >>>>>>> >>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>> >>>>>>> >>>>>>> The answer to this should be portable, as non-portable ones will be >>>>>>> deprecated. >>>>>>> >>>>>>> >>>>>>> Well, actually this is a question that I don’t remember we discussed >>>>>>> here in details before and had a common agreement. >>>>>>> >>>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>>> “deprecation" in this case. For example, Portable Spark Runner is >>>>>>> heavily >>>>>>> actually based on native Spark RDD runner and its translations. So, >>>>>>> which >>>>>>> part should be deprecated and what is a reason for that? >>>>>>> >>>>>>> Well, anyway I guess it’s off topic here. >>>>>>> >>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>> So I agree that more details on this would be useful. >>>>>>> >>>>>>> — >>>>>>> Alexey >>>>>>> >>>>>>> >>>>>>> - Which SDK I should use for this runner? >>>>>>>> >>>>>>> >>>>>>> The answer to the above question makes this one moot :). >>>>>>> >>>>>>> On a more serious note, could you tell us a bit more about the >>>>>>> runner you're looking at implementing? >>>>>>> >>>>>>> >>>>>>>> Then, depending on answers, I’d suggest to take as an example one >>>>>>>> of the most similar Beam runners and use it as a more detailed source >>>>>>>> of >>>>>>>> information along with Beam runner doc mentioned before. >>>>>>>> >>>>>>>> — >>>>>>>> Alexey >>>>>>>> >>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Beam community! >>>>>>>> >>>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>>> execution environment but I'm struggling to get started. I've read the >>>>>>>> docs >>>>>>>> page >>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> >>>>>>>> on implementing a runner but it's quite high level. Anyone have any >>>>>>>> concrete suggestions on getting started? >>>>>>>> >>>>>>>> I've started by cloning and running the hello world example >>>>>>>> <https://github.com/apache/beam-starter-python>. I've then >>>>>>>> subclassed `PipelineRunner >>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` >>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My >>>>>>>> custom >>>>>>>> runner just looks like >>>>>>>> >>>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>>> def run_pipeline(self, pipeline, >>>>>>>> options): >>>>>>>> self.visit_transforms(pipeline, options) >>>>>>>> >>>>>>>> And when using it I get an error about not having implemented >>>>>>>> "Impulse" >>>>>>>> >>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner >>>>>>>> object >>>>>>>> at 0x135d9ff40>. >>>>>>>> >>>>>>>> Am I going about this the right way? Are there tests I can run my >>>>>>>> custom runner against to validate it beyond just running the hello >>>>>>>> world >>>>>>>> example? I'm finding myself just digging through the beam source to >>>>>>>> try to >>>>>>>> piece together how a runner works and I'm struggling to get a >>>>>>>> foothold. Any >>>>>>>> guidance would be greatly appreciated, especially if anyone has any >>>>>>>> experience implementing their own python runner. >>>>>>>> >>>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>>> Cheers, >>>>>>>> Joey >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>