Another advantage of a portable runner would be that it will be using well
defined and backwards compatible Beam portable APIs to communicate with
SDKs. I think this is specially important for runners that do not live in
the Beam repo since otherwise future SDK releases could break your runner
in subtle ways. Also portability gives you more flexibility when it comes
to choosing an SDK to define the pipeline and will allow you to execute
transforms in any SDK via cross-language.

Thanks,
Cham

On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

>
>
> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Totally doable by one person, especially given the limited feature set
>>> you mention above.
>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>  is
>>> a good starting point as to what the relationship between a Runner and the
>>> SDK is at a level of detail sufficient for implementation (told from the
>>> perspective of an SDK, but the story is largely about the interface which
>>> is directly applicable).
>>
>>
>> Great slides, I really appreciate the illustrations.
>>
>> I hadn't realized there was a concept of an "SDK Worker", I had imagined
>> that once the Runner started execution of a workflow, it was Runner all the
>> way down. Is the Fn API the only way to implement a runner? Our execution
>> environment is a bit constrained in such a way that we can't expose the
>> APIs required to implement the Fn API. To be forthright, we basically only
>> have the ability to start a worker either with a known Pub/Sub topic to
>> expect data from and a Pub/Sub topic to write to; or with a bundle of data
>> to process and return the outputs for. We're constrained from really any
>> additional communication with a worker beyond that.
>>
>
> The "worker" abstraction gives the ability to wrap any user code in a way
> that it can be called from any runner. If you're willing to constrain the
> code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
> logical, rather than physical, concept.
>
> Another way to look at it is that in practice, the "runner" often has its
> own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
> Worker" (which in turn invokes the actual DoFns). This latter may be
> inlined (e.g. if it's 100% Python on both sides). See, for example,
> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>
>
>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Thanks all for the responses!
>>>>
>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>> - Am I going to implement a portable runner or native one?
>>>>>
>>>>
>>>> Portable sounds great, but the answer depends on how much additional
>>>> cost it'd require to implement portable over non-portable, even considering
>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>> answer.
>>>>
>>>
>>> I would way it would not be that expensive to write it in a "portable
>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>> interface rather than reaching into internal details) even if it doesn't
>>> use GRCP and fire up separate processes/docker images for the workers
>>> (preferring to do tall of that inline like the Python portable direct
>>> runner does).
>>>
>>>
>>>> - Which SDK I should use for this runner?
>>>>>
>>>> I'd be developing this runner against the python SDK and if the runner
>>>> only worked with the python SDK that'd be okay in the short term
>>>>
>>>
>>> Yes. And if you do it the above way, it should be easy to extend (or
>>> not) if/when the need arises.
>>>
>>>
>>>> Also, we don’t know if this new runner will be contributed back to Beam,
>>>>> what is a runtime and what actually is a final goal of it.
>>>>
>>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>>> useful to a wide audience anyways).
>>>>
>>>> The context is we've been developing an in-house large-scale pipeline
>>>> framework that encapsulates both the programming model and the
>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>> just reimplementing features and abstractions Beam has already implemented,
>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>> particular though and our workflows require it (due to the way we license
>>>> our software), so my plan was to try to create a very basic runner that
>>>> uses our execution environment. The runner could have very few features
>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>> introduce a shim for some of our internally implemented transforms and
>>>> assess from there.
>>>>
>>>> Not sure if this is a lofty goal or not, so happy to hear your thoughts
>>>> as to whether this seems reasonable and achievable without a large
>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>> that it might not be *easy*, but I don't have the resources to
>>>> dedicate more than myself to work on a PoC)
>>>>
>>>
>>> Totally doable by one person, especially given the limited feature set
>>> you mention above.
>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>> is a good starting point as to what the relationship between a Runner and
>>> the SDK is at a level of detail sufficient for implementation (told from
>>> the perspective of an SDK, but the story is largely about the interface
>>> which is directly applicable).
>>>
>>> Given the limited feature set you proposed, this is similar to the
>>> original Python portable runner which took a week or two to put together
>>> (granted a lot has been added since then), or the typescript direct runner
>>> (
>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>> ) which was done (in its basic form, no support for side inputs and such)
>>> in less than a week. Granted, as these are local runners, this illustrates
>>> only the Beam-side complexity of things (not the work of actually
>>> implementing a distributed shuffle, starting and assigning work to multiple
>>> workers, etc. but presumably that's the kind of thing your execution
>>> environment already takes care of.
>>>
>>> As for some more concrete pointers, you could probably leverage a lot of
>>> what's there by invoking create_stages
>>>
>>>
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>
>>> which will do optimization, fusion, etc. and then implementing your own
>>> version of run_stages
>>>
>>>
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>
>>> to execute these in topological order on your compute infrastructure.
>>> (If you're not doing streaming, this is much more straightforward than all
>>> the bundler scheduler stuff that currently exists in that code).
>>>
>>>
>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>> aromanenko....@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>> aromanenko....@gmail.com> wrote:
>>>>>
>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>
>>>>>
>>>>> The answer to this should be portable, as non-portable ones will be
>>>>> deprecated.
>>>>>
>>>>>
>>>>> Well, actually this is a question that I don’t remember we discussed
>>>>> here in details before and had a common agreement.
>>>>>
>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>> part should be deprecated and what is a reason for that?
>>>>>
>>>>> Well, anyway I guess it’s off topic here.
>>>>>
>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>> So I agree that more details on this would be useful.
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>>
>>>>> - Which SDK I should use for this runner?
>>>>>>
>>>>>
>>>>> The answer to the above question makes this one moot :).
>>>>>
>>>>> On a more serious note, could you tell us a bit more about the runner
>>>>> you're looking at implementing?
>>>>>
>>>>>
>>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>>> the most similar Beam runners and use it as a more detailed source of
>>>>>> information along with Beam runner doc mentioned before.
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Beam community!
>>>>>>
>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>> execution environment but I'm struggling to get started. I've read the 
>>>>>> docs
>>>>>> page
>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>> concrete suggestions on getting started?
>>>>>>
>>>>>> I've started by cloning and running the hello world example
>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>> subclassed `PipelineRunner
>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>> to create my own custom runner but at this point I'm a bit stuck. My 
>>>>>> custom
>>>>>> runner just looks like
>>>>>>
>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>     def run_pipeline(self, pipeline,
>>>>>>                      options):
>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>
>>>>>> And when using it I get an error about not having implemented
>>>>>> "Impulse"
>>>>>>
>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner 
>>>>>> object
>>>>>> at 0x135d9ff40>.
>>>>>>
>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>> example? I'm finding myself just digging through the beam source to try 
>>>>>> to
>>>>>> piece together how a runner works and I'm struggling to get a foothold. 
>>>>>> Any
>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>> experience implementing their own python runner.
>>>>>>
>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>> Cheers,
>>>>>> Joey
>>>>>>
>>>>>>
>>>>>>
>>>>>

Reply via email to