Could you let me know when you update it? I would be interested in
rereading after the rewrite.

Thanks!
Joey

On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <rober...@google.com> wrote:

> I'm taking an action item to update that page, as it is *way* out of date.
>
> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> I see. I guess I got a little confused since these are mentioned in the 
>> Authoring
>> a Runner
>> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
>> page which implied to me that they'd be safe to use. I'll check out the
>> bundle_processor. Thanks!
>>
>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Working on this on and off now and getting some pretty good traction.
>>>>
>>>> One thing I'm a little worried about is all the classes that are marked
>>>> "internal use only". A lot of these seem either very useful or possibly
>>>> critical to writing a runner. How strictly should I interpret these private
>>>> implementation labels?
>>>>
>>>> A few bits that I'm interested in using ordered by how surprised I was
>>>> to find that they're internal only.
>>>>
>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>
>>> The public API is the protos. You should not have to interact
>>> with AppliedPTransform and PipelineVisitor directly (and while you can
>>> reach in and do so, there are no promises here and these are subject to
>>> change). As for DoFnRunner, if you're trying to reach in at this level
>>> you're probably going to have to be replicating a bunch of surrounding
>>> infrastructure as well. I would recommend using a BundleProcessor [1] to
>>> coordinate the work (which will internally wire up the chain of DoFns
>>> correctly and take them through their proper lifecycle). As mentioned
>>> above, you can directly borrow the translations in fn_api_runner to go from
>>> a full Pipeline graph (proto) to a set of fused DoFns to execute in
>>> topological order (as ProcessBundleDescriptor protos, which is
>>> what BundleProcessor accepts).
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>
>>>
>>>> Thanks again for the help,
>>>> Joey
>>>>
>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> Another advantage of a portable runner would be that it will be using
>>>>> well defined and backwards compatible Beam portable APIs to communicate
>>>>> with SDKs. I think this is specially important for runners that do not 
>>>>> live
>>>>> in the Beam repo since otherwise future SDK releases could break your
>>>>> runner in subtle ways. Also portability gives you more flexibility when it
>>>>> comes to choosing an SDK to define the pipeline and will allow you to
>>>>> execute transforms in any SDK via cross-language.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>>> set you mention above.
>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>>  is
>>>>>>>> a good starting point as to what the relationship between a Runner and 
>>>>>>>> the
>>>>>>>> SDK is at a level of detail sufficient for implementation (told from 
>>>>>>>> the
>>>>>>>> perspective of an SDK, but the story is largely about the interface 
>>>>>>>> which
>>>>>>>> is directly applicable).
>>>>>>>
>>>>>>>
>>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>>
>>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>>>> Runner all the way down. Is the Fn API the only way to implement a 
>>>>>>> runner?
>>>>>>> Our execution environment is a bit constrained in such a way that we 
>>>>>>> can't
>>>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>>>> basically only have the ability to start a worker either with a known
>>>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or 
>>>>>>> with
>>>>>>> a bundle of data to process and return the outputs for. We're 
>>>>>>> constrained
>>>>>>> from really any additional communication with a worker beyond that.
>>>>>>>
>>>>>>
>>>>>> The "worker" abstraction gives the ability to wrap any user code in a
>>>>>> way that it can be called from any runner. If you're willing to constrain
>>>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" 
>>>>>> can
>>>>>> be a logical, rather than physical, concept.
>>>>>>
>>>>>> Another way to look at it is that in practice, the "runner" often has
>>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>>>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>>>
>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <
>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks all for the responses!
>>>>>>>>>
>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you,
>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Portable sounds great, but the answer depends on how much
>>>>>>>>> additional cost it'd require to implement portable over non-portable, 
>>>>>>>>> even
>>>>>>>>> considering future deprecation (unless deprecation is happening 
>>>>>>>>> tomorrow).
>>>>>>>>> I'm not familiar enough to know what the additional cost is so I 
>>>>>>>>> don't have
>>>>>>>>> a firm answer.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I would way it would not be that expensive to write it in a
>>>>>>>> "portable compatible" way (i.e it uses the publicly-documented 
>>>>>>>> protocol as
>>>>>>>> the interface rather than reaching into internal details) even if it
>>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the
>>>>>>>> workers (preferring to do tall of that inline like the Python portable
>>>>>>>> direct runner does).
>>>>>>>>
>>>>>>>>
>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>
>>>>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>>>>> runner only worked with the python SDK that'd be okay in the short 
>>>>>>>>> term
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes. And if you do it the above way, it should be easy to extend
>>>>>>>> (or not) if/when the need arises.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>
>>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd
>>>>>>>>> actually be useful to a wide audience anyways).
>>>>>>>>>
>>>>>>>>> The context is we've been developing an in-house large-scale
>>>>>>>>> pipeline framework that encapsulates both the programming model and 
>>>>>>>>> the
>>>>>>>>> runner/execution of data workflows. As it's grown, I keep finding 
>>>>>>>>> myself
>>>>>>>>> just reimplementing features and abstractions Beam has already 
>>>>>>>>> implemented,
>>>>>>>>> so I wanted to explore adopting Beam. Our execution environment is 
>>>>>>>>> very
>>>>>>>>> particular though and our workflows require it (due to the way we 
>>>>>>>>> license
>>>>>>>>> our software), so my plan was to try to create a very basic runner 
>>>>>>>>> that
>>>>>>>>> uses our execution environment. The runner could have very few 
>>>>>>>>> features
>>>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd 
>>>>>>>>> probably
>>>>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>>>>> assess from there.
>>>>>>>>>
>>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>>>>> thoughts as to whether this seems reasonable and achievable without a 
>>>>>>>>> large
>>>>>>>>> concerted effort or even if the general idea makes any sense. (I 
>>>>>>>>> recognize
>>>>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>>>>> dedicate more than myself to work on a PoC)
>>>>>>>>>
>>>>>>>>
>>>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>>> set you mention above.
>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>> is a good starting point as to what the relationship between a Runner 
>>>>>>>> and
>>>>>>>> the SDK is at a level of detail sufficient for implementation (told 
>>>>>>>> from
>>>>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>>>>> which is directly applicable).
>>>>>>>>
>>>>>>>> Given the limited feature set you proposed, this is similar to the
>>>>>>>> original Python portable runner which took a week or two to put 
>>>>>>>> together
>>>>>>>> (granted a lot has been added since then), or the typescript direct 
>>>>>>>> runner
>>>>>>>> (
>>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>>>> ) which was done (in its basic form, no support for side inputs and 
>>>>>>>> such)
>>>>>>>> in less than a week. Granted, as these are local runners, this 
>>>>>>>> illustrates
>>>>>>>> only the Beam-side complexity of things (not the work of actually
>>>>>>>> implementing a distributed shuffle, starting and assigning work to 
>>>>>>>> multiple
>>>>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>>>>> environment already takes care of.
>>>>>>>>
>>>>>>>> As for some more concrete pointers, you could probably leverage a
>>>>>>>> lot of what's there by invoking create_stages
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>>>
>>>>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>>>>> own version of run_stages
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>>>
>>>>>>>> to execute these in topological order on your compute
>>>>>>>> infrastructure. (If you're not doing streaming, this is much more
>>>>>>>> straightforward than all the bundler scheduler stuff that currently 
>>>>>>>> exists
>>>>>>>> in that code).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you,
>>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The answer to this should be portable, as non-portable ones will
>>>>>>>>>> be deprecated.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Well, actually this is a question that I don’t remember we
>>>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>>>
>>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is 
>>>>>>>>>> heavily
>>>>>>>>>> actually based on native Spark RDD runner and its translations. So, 
>>>>>>>>>> which
>>>>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>>>>
>>>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>>>
>>>>>>>>>> Also, we don’t know if this new runner will be contributed back
>>>>>>>>>> to Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>>>
>>>>>>>>>> —
>>>>>>>>>> Alexey
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>>>
>>>>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>>>>> runner you're looking at implementing?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example
>>>>>>>>>>> one of the most similar Beam runners and use it as a more detailed 
>>>>>>>>>>> source
>>>>>>>>>>> of information along with Beam runner doc mentioned before.
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Alexey
>>>>>>>>>>>
>>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Beam community!
>>>>>>>>>>>
>>>>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>>>>> execution environment but I'm struggling to get started. I've read 
>>>>>>>>>>> the docs
>>>>>>>>>>> page
>>>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>>>>> concrete suggestions on getting started?
>>>>>>>>>>>
>>>>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>>>>> subclassed `PipelineRunner
>>>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. 
>>>>>>>>>>> My custom
>>>>>>>>>>> runner just looks like
>>>>>>>>>>>
>>>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>>>                      options):
>>>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>>>
>>>>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>>>>> "Impulse"
>>>>>>>>>>>
>>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>>>>> label=[Impulse]>] not implemented in runner 
>>>>>>>>>>> <my_app.app.CustomRunner object
>>>>>>>>>>> at 0x135d9ff40>.
>>>>>>>>>>>
>>>>>>>>>>> Am I going about this the right way? Are there tests I can run
>>>>>>>>>>> my custom runner against to validate it beyond just running the 
>>>>>>>>>>> hello world
>>>>>>>>>>> example? I'm finding myself just digging through the beam source to 
>>>>>>>>>>> try to
>>>>>>>>>>> piece together how a runner works and I'm struggling to get a 
>>>>>>>>>>> foothold. Any
>>>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>>>>> experience implementing their own python runner.
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Joey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to