Working on this on and off now and getting some pretty good traction. One thing I'm a little worried about is all the classes that are marked "internal use only". A lot of these seem either very useful or possibly critical to writing a runner. How strictly should I interpret these private implementation labels?
A few bits that I'm interested in using ordered by how surprised I was to find that they're internal only. - apache_bean.pipeline.AppliedPTransform - apache_beam.pipeline.PipelineVisitor - apache_beam.runners.common.DoFnRunner Thanks again for the help, Joey On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com> wrote: > Another advantage of a portable runner would be that it will be using well > defined and backwards compatible Beam portable APIs to communicate with > SDKs. I think this is specially important for runners that do not live in > the Beam repo since otherwise future SDK releases could break your runner > in subtle ways. Also portability gives you more flexibility when it comes > to choosing an SDK to define the pipeline and will allow you to execute > transforms in any SDK via cross-language. > > Thanks, > Cham > > On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user < > user@beam.apache.org> wrote: > >> >> >> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> Totally doable by one person, especially given the limited feature set >>>> you mention above. >>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>> is >>>> a good starting point as to what the relationship between a Runner and the >>>> SDK is at a level of detail sufficient for implementation (told from the >>>> perspective of an SDK, but the story is largely about the interface which >>>> is directly applicable). >>> >>> >>> Great slides, I really appreciate the illustrations. >>> >>> I hadn't realized there was a concept of an "SDK Worker", I had imagined >>> that once the Runner started execution of a workflow, it was Runner all the >>> way down. Is the Fn API the only way to implement a runner? Our execution >>> environment is a bit constrained in such a way that we can't expose the >>> APIs required to implement the Fn API. To be forthright, we basically only >>> have the ability to start a worker either with a known Pub/Sub topic to >>> expect data from and a Pub/Sub topic to write to; or with a bundle of data >>> to process and return the outputs for. We're constrained from really any >>> additional communication with a worker beyond that. >>> >> >> The "worker" abstraction gives the ability to wrap any user code in a way >> that it can be called from any runner. If you're willing to constrain the >> code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a >> logical, rather than physical, concept. >> >> Another way to look at it is that in practice, the "runner" often has its >> own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK >> Worker" (which in turn invokes the actual DoFns). This latter may be >> inlined (e.g. if it's 100% Python on both sides). See, for example, >> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >> >> >>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Thanks all for the responses! >>>>> >>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at >>>>>> fist, I’d suggest to answer two questions for yourself: >>>>>> - Am I going to implement a portable runner or native one? >>>>>> >>>>> >>>>> Portable sounds great, but the answer depends on how much additional >>>>> cost it'd require to implement portable over non-portable, even >>>>> considering >>>>> future deprecation (unless deprecation is happening tomorrow). I'm not >>>>> familiar enough to know what the additional cost is so I don't have a firm >>>>> answer. >>>>> >>>> >>>> I would way it would not be that expensive to write it in a "portable >>>> compatible" way (i.e it uses the publicly-documented protocol as the >>>> interface rather than reaching into internal details) even if it doesn't >>>> use GRCP and fire up separate processes/docker images for the workers >>>> (preferring to do tall of that inline like the Python portable direct >>>> runner does). >>>> >>>> >>>>> - Which SDK I should use for this runner? >>>>>> >>>>> I'd be developing this runner against the python SDK and if the runner >>>>> only worked with the python SDK that'd be okay in the short term >>>>> >>>> >>>> Yes. And if you do it the above way, it should be easy to extend (or >>>> not) if/when the need arises. >>>> >>>> >>>>> Also, we don’t know if this new runner will be contributed back to >>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>> >>>>> Likely won't be contributed back to Beam (not sure if it'd actually be >>>>> useful to a wide audience anyways). >>>>> >>>>> The context is we've been developing an in-house large-scale pipeline >>>>> framework that encapsulates both the programming model and the >>>>> runner/execution of data workflows. As it's grown, I keep finding myself >>>>> just reimplementing features and abstractions Beam has already >>>>> implemented, >>>>> so I wanted to explore adopting Beam. Our execution environment is very >>>>> particular though and our workflows require it (due to the way we license >>>>> our software), so my plan was to try to create a very basic runner that >>>>> uses our execution environment. The runner could have very few features >>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd >>>>> probably >>>>> introduce a shim for some of our internally implemented transforms and >>>>> assess from there. >>>>> >>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>> thoughts as to whether this seems reasonable and achievable without a >>>>> large >>>>> concerted effort or even if the general idea makes any sense. (I recognize >>>>> that it might not be *easy*, but I don't have the resources to >>>>> dedicate more than myself to work on a PoC) >>>>> >>>> >>>> Totally doable by one person, especially given the limited feature set >>>> you mention above. >>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>> is a good starting point as to what the relationship between a Runner and >>>> the SDK is at a level of detail sufficient for implementation (told from >>>> the perspective of an SDK, but the story is largely about the interface >>>> which is directly applicable). >>>> >>>> Given the limited feature set you proposed, this is similar to the >>>> original Python portable runner which took a week or two to put together >>>> (granted a lot has been added since then), or the typescript direct runner >>>> ( >>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>> ) which was done (in its basic form, no support for side inputs and such) >>>> in less than a week. Granted, as these are local runners, this illustrates >>>> only the Beam-side complexity of things (not the work of actually >>>> implementing a distributed shuffle, starting and assigning work to multiple >>>> workers, etc. but presumably that's the kind of thing your execution >>>> environment already takes care of. >>>> >>>> As for some more concrete pointers, you could probably leverage a lot >>>> of what's there by invoking create_stages >>>> >>>> >>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>> >>>> which will do optimization, fusion, etc. and then implementing your own >>>> version of run_stages >>>> >>>> >>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>> >>>> to execute these in topological order on your compute infrastructure. >>>> (If you're not doing streaming, this is much more straightforward than all >>>> the bundler scheduler stuff that currently exists in that code). >>>> >>>> >>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko < >>>>> aromanenko....@gmail.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user < >>>>>> user@beam.apache.org> wrote: >>>>>> >>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko < >>>>>> aromanenko....@gmail.com> wrote: >>>>>> >>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>> - Am I going to implement a portable runner or native one? >>>>>>> >>>>>> >>>>>> The answer to this should be portable, as non-portable ones will be >>>>>> deprecated. >>>>>> >>>>>> >>>>>> Well, actually this is a question that I don’t remember we discussed >>>>>> here in details before and had a common agreement. >>>>>> >>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily >>>>>> actually based on native Spark RDD runner and its translations. So, which >>>>>> part should be deprecated and what is a reason for that? >>>>>> >>>>>> Well, anyway I guess it’s off topic here. >>>>>> >>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>> So I agree that more details on this would be useful. >>>>>> >>>>>> — >>>>>> Alexey >>>>>> >>>>>> >>>>>> - Which SDK I should use for this runner? >>>>>>> >>>>>> >>>>>> The answer to the above question makes this one moot :). >>>>>> >>>>>> On a more serious note, could you tell us a bit more about the runner >>>>>> you're looking at implementing? >>>>>> >>>>>> >>>>>>> Then, depending on answers, I’d suggest to take as an example one of >>>>>>> the most similar Beam runners and use it as a more detailed source of >>>>>>> information along with Beam runner doc mentioned before. >>>>>>> >>>>>>> — >>>>>>> Alexey >>>>>>> >>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Beam community! >>>>>>> >>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>> execution environment but I'm struggling to get started. I've read the >>>>>>> docs >>>>>>> page >>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> >>>>>>> on implementing a runner but it's quite high level. Anyone have any >>>>>>> concrete suggestions on getting started? >>>>>>> >>>>>>> I've started by cloning and running the hello world example >>>>>>> <https://github.com/apache/beam-starter-python>. I've then >>>>>>> subclassed `PipelineRunner >>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` >>>>>>> to create my own custom runner but at this point I'm a bit stuck. My >>>>>>> custom >>>>>>> runner just looks like >>>>>>> >>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>> def run_pipeline(self, pipeline, >>>>>>> options): >>>>>>> self.visit_transforms(pipeline, options) >>>>>>> >>>>>>> And when using it I get an error about not having implemented >>>>>>> "Impulse" >>>>>>> >>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner >>>>>>> object >>>>>>> at 0x135d9ff40>. >>>>>>> >>>>>>> Am I going about this the right way? Are there tests I can run my >>>>>>> custom runner against to validate it beyond just running the hello world >>>>>>> example? I'm finding myself just digging through the beam source to try >>>>>>> to >>>>>>> piece together how a runner works and I'm struggling to get a foothold. >>>>>>> Any >>>>>>> guidance would be greatly appreciated, especially if anyone has any >>>>>>> experience implementing their own python runner. >>>>>>> >>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>> Cheers, >>>>>>> Joey >>>>>>> >>>>>>> >>>>>>> >>>>>>