Could you let me know when you update it? I would be interested in rereading after the rewrite.
Thanks! Joey On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <rober...@google.com> wrote: > I'm taking an action item to update that page, as it is *way* out of date. > > On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> I see. I guess I got a little confused since these are mentioned in the >> Authoring >> a Runner >> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs >> page which implied to me that they'd be safe to use. I'll check out the >> bundle_processor. Thanks! >> >> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Working on this on and off now and getting some pretty good traction. >>>> >>>> One thing I'm a little worried about is all the classes that are marked >>>> "internal use only". A lot of these seem either very useful or possibly >>>> critical to writing a runner. How strictly should I interpret these private >>>> implementation labels? >>>> >>>> A few bits that I'm interested in using ordered by how surprised I was >>>> to find that they're internal only. >>>> >>>> - apache_bean.pipeline.AppliedPTransform >>>> - apache_beam.pipeline.PipelineVisitor >>>> - apache_beam.runners.common.DoFnRunner >>>> >>> >>> The public API is the protos. You should not have to interact >>> with AppliedPTransform and PipelineVisitor directly (and while you can >>> reach in and do so, there are no promises here and these are subject to >>> change). As for DoFnRunner, if you're trying to reach in at this level >>> you're probably going to have to be replicating a bunch of surrounding >>> infrastructure as well. I would recommend using a BundleProcessor [1] to >>> coordinate the work (which will internally wire up the chain of DoFns >>> correctly and take them through their proper lifecycle). As mentioned >>> above, you can directly borrow the translations in fn_api_runner to go from >>> a full Pipeline graph (proto) to a set of fused DoFns to execute in >>> topological order (as ProcessBundleDescriptor protos, which is >>> what BundleProcessor accepts). >>> >>> [1] >>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851 >>> >>> >>>> Thanks again for the help, >>>> Joey >>>> >>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> Another advantage of a portable runner would be that it will be using >>>>> well defined and backwards compatible Beam portable APIs to communicate >>>>> with SDKs. I think this is specially important for runners that do not >>>>> live >>>>> in the Beam repo since otherwise future SDK releases could break your >>>>> runner in subtle ways. Also portability gives you more flexibility when it >>>>> comes to choosing an SDK to define the pipeline and will allow you to >>>>> execute transforms in any SDK via cross-language. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user < >>>>> user@beam.apache.org> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Totally doable by one person, especially given the limited feature >>>>>>>> set you mention above. >>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>>> is >>>>>>>> a good starting point as to what the relationship between a Runner and >>>>>>>> the >>>>>>>> SDK is at a level of detail sufficient for implementation (told from >>>>>>>> the >>>>>>>> perspective of an SDK, but the story is largely about the interface >>>>>>>> which >>>>>>>> is directly applicable). >>>>>>> >>>>>>> >>>>>>> Great slides, I really appreciate the illustrations. >>>>>>> >>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had >>>>>>> imagined that once the Runner started execution of a workflow, it was >>>>>>> Runner all the way down. Is the Fn API the only way to implement a >>>>>>> runner? >>>>>>> Our execution environment is a bit constrained in such a way that we >>>>>>> can't >>>>>>> expose the APIs required to implement the Fn API. To be forthright, we >>>>>>> basically only have the ability to start a worker either with a known >>>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or >>>>>>> with >>>>>>> a bundle of data to process and return the outputs for. We're >>>>>>> constrained >>>>>>> from really any additional communication with a worker beyond that. >>>>>>> >>>>>> >>>>>> The "worker" abstraction gives the ability to wrap any user code in a >>>>>> way that it can be called from any runner. If you're willing to constrain >>>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" >>>>>> can >>>>>> be a logical, rather than physical, concept. >>>>>> >>>>>> Another way to look at it is that in practice, the "runner" often has >>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical >>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be >>>>>> inlined (e.g. if it's 100% Python on both sides). See, for example, >>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >>>>>> >>>>>> >>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran < >>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>> >>>>>>>>> Thanks all for the responses! >>>>>>>>> >>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, >>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself: >>>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>>> >>>>>>>>> >>>>>>>>> Portable sounds great, but the answer depends on how much >>>>>>>>> additional cost it'd require to implement portable over non-portable, >>>>>>>>> even >>>>>>>>> considering future deprecation (unless deprecation is happening >>>>>>>>> tomorrow). >>>>>>>>> I'm not familiar enough to know what the additional cost is so I >>>>>>>>> don't have >>>>>>>>> a firm answer. >>>>>>>>> >>>>>>>> >>>>>>>> I would way it would not be that expensive to write it in a >>>>>>>> "portable compatible" way (i.e it uses the publicly-documented >>>>>>>> protocol as >>>>>>>> the interface rather than reaching into internal details) even if it >>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the >>>>>>>> workers (preferring to do tall of that inline like the Python portable >>>>>>>> direct runner does). >>>>>>>> >>>>>>>> >>>>>>>>> - Which SDK I should use for this runner? >>>>>>>>>> >>>>>>>>> I'd be developing this runner against the python SDK and if the >>>>>>>>> runner only worked with the python SDK that'd be okay in the short >>>>>>>>> term >>>>>>>>> >>>>>>>> >>>>>>>> Yes. And if you do it the above way, it should be easy to extend >>>>>>>> (or not) if/when the need arises. >>>>>>>> >>>>>>>> >>>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>>> >>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd >>>>>>>>> actually be useful to a wide audience anyways). >>>>>>>>> >>>>>>>>> The context is we've been developing an in-house large-scale >>>>>>>>> pipeline framework that encapsulates both the programming model and >>>>>>>>> the >>>>>>>>> runner/execution of data workflows. As it's grown, I keep finding >>>>>>>>> myself >>>>>>>>> just reimplementing features and abstractions Beam has already >>>>>>>>> implemented, >>>>>>>>> so I wanted to explore adopting Beam. Our execution environment is >>>>>>>>> very >>>>>>>>> particular though and our workflows require it (due to the way we >>>>>>>>> license >>>>>>>>> our software), so my plan was to try to create a very basic runner >>>>>>>>> that >>>>>>>>> uses our execution environment. The runner could have very few >>>>>>>>> features >>>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd >>>>>>>>> probably >>>>>>>>> introduce a shim for some of our internally implemented transforms and >>>>>>>>> assess from there. >>>>>>>>> >>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>>>>>> thoughts as to whether this seems reasonable and achievable without a >>>>>>>>> large >>>>>>>>> concerted effort or even if the general idea makes any sense. (I >>>>>>>>> recognize >>>>>>>>> that it might not be *easy*, but I don't have the resources to >>>>>>>>> dedicate more than myself to work on a PoC) >>>>>>>>> >>>>>>>> >>>>>>>> Totally doable by one person, especially given the limited feature >>>>>>>> set you mention above. >>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>>> is a good starting point as to what the relationship between a Runner >>>>>>>> and >>>>>>>> the SDK is at a level of detail sufficient for implementation (told >>>>>>>> from >>>>>>>> the perspective of an SDK, but the story is largely about the interface >>>>>>>> which is directly applicable). >>>>>>>> >>>>>>>> Given the limited feature set you proposed, this is similar to the >>>>>>>> original Python portable runner which took a week or two to put >>>>>>>> together >>>>>>>> (granted a lot has been added since then), or the typescript direct >>>>>>>> runner >>>>>>>> ( >>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>>>>>> ) which was done (in its basic form, no support for side inputs and >>>>>>>> such) >>>>>>>> in less than a week. Granted, as these are local runners, this >>>>>>>> illustrates >>>>>>>> only the Beam-side complexity of things (not the work of actually >>>>>>>> implementing a distributed shuffle, starting and assigning work to >>>>>>>> multiple >>>>>>>> workers, etc. but presumably that's the kind of thing your execution >>>>>>>> environment already takes care of. >>>>>>>> >>>>>>>> As for some more concrete pointers, you could probably leverage a >>>>>>>> lot of what's there by invoking create_stages >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>>>>>> >>>>>>>> which will do optimization, fusion, etc. and then implementing your >>>>>>>> own version of run_stages >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>>>>>> >>>>>>>> to execute these in topological order on your compute >>>>>>>> infrastructure. (If you're not doing streaming, this is much more >>>>>>>> straightforward than all the bundler scheduler stuff that currently >>>>>>>> exists >>>>>>>> in that code). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko < >>>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user < >>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko < >>>>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, >>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself: >>>>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The answer to this should be portable, as non-portable ones will >>>>>>>>>> be deprecated. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Well, actually this is a question that I don’t remember we >>>>>>>>>> discussed here in details before and had a common agreement. >>>>>>>>>> >>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is >>>>>>>>>> heavily >>>>>>>>>> actually based on native Spark RDD runner and its translations. So, >>>>>>>>>> which >>>>>>>>>> part should be deprecated and what is a reason for that? >>>>>>>>>> >>>>>>>>>> Well, anyway I guess it’s off topic here. >>>>>>>>>> >>>>>>>>>> Also, we don’t know if this new runner will be contributed back >>>>>>>>>> to Beam, what is a runtime and what actually is a final goal of it. >>>>>>>>>> So I agree that more details on this would be useful. >>>>>>>>>> >>>>>>>>>> — >>>>>>>>>> Alexey >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - Which SDK I should use for this runner? >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The answer to the above question makes this one moot :). >>>>>>>>>> >>>>>>>>>> On a more serious note, could you tell us a bit more about the >>>>>>>>>> runner you're looking at implementing? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example >>>>>>>>>>> one of the most similar Beam runners and use it as a more detailed >>>>>>>>>>> source >>>>>>>>>>> of information along with Beam runner doc mentioned before. >>>>>>>>>>> >>>>>>>>>>> — >>>>>>>>>>> Alexey >>>>>>>>>>> >>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Beam community! >>>>>>>>>>> >>>>>>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>>>>>> execution environment but I'm struggling to get started. I've read >>>>>>>>>>> the docs >>>>>>>>>>> page >>>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> >>>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any >>>>>>>>>>> concrete suggestions on getting started? >>>>>>>>>>> >>>>>>>>>>> I've started by cloning and running the hello world example >>>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then >>>>>>>>>>> subclassed `PipelineRunner >>>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` >>>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. >>>>>>>>>>> My custom >>>>>>>>>>> runner just looks like >>>>>>>>>>> >>>>>>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>>>>>> def run_pipeline(self, pipeline, >>>>>>>>>>> options): >>>>>>>>>>> self.visit_transforms(pipeline, options) >>>>>>>>>>> >>>>>>>>>>> And when using it I get an error about not having implemented >>>>>>>>>>> "Impulse" >>>>>>>>>>> >>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>>>>>> label=[Impulse]>] not implemented in runner >>>>>>>>>>> <my_app.app.CustomRunner object >>>>>>>>>>> at 0x135d9ff40>. >>>>>>>>>>> >>>>>>>>>>> Am I going about this the right way? Are there tests I can run >>>>>>>>>>> my custom runner against to validate it beyond just running the >>>>>>>>>>> hello world >>>>>>>>>>> example? I'm finding myself just digging through the beam source to >>>>>>>>>>> try to >>>>>>>>>>> piece together how a runner works and I'm struggling to get a >>>>>>>>>>> foothold. Any >>>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any >>>>>>>>>>> experience implementing their own python runner. >>>>>>>>>>> >>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>>>>>> Cheers, >>>>>>>>>>> Joey >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>