Working on this on and off now and getting some pretty good traction.

One thing I'm a little worried about is all the classes that are marked
"internal use only". A lot of these seem either very useful or possibly
critical to writing a runner. How strictly should I interpret these private
implementation labels?

A few bits that I'm interested in using ordered by how surprised I was to
find that they're internal only.

 - apache_bean.pipeline.AppliedPTransform
 - apache_beam.pipeline.PipelineVisitor
 - apache_beam.runners.common.DoFnRunner

Thanks again for the help,
Joey

On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Another advantage of a portable runner would be that it will be using well
> defined and backwards compatible Beam portable APIs to communicate with
> SDKs. I think this is specially important for runners that do not live in
> the Beam repo since otherwise future SDK releases could break your runner
> in subtle ways. Also portability gives you more flexibility when it comes
> to choosing an SDK to define the pipeline and will allow you to execute
> transforms in any SDK via cross-language.
>
> Thanks,
> Cham
>
> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>>
>>
>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com>
>> wrote:
>>
>>> Totally doable by one person, especially given the limited feature set
>>>> you mention above.
>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>  is
>>>> a good starting point as to what the relationship between a Runner and the
>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>> perspective of an SDK, but the story is largely about the interface which
>>>> is directly applicable).
>>>
>>>
>>> Great slides, I really appreciate the illustrations.
>>>
>>> I hadn't realized there was a concept of an "SDK Worker", I had imagined
>>> that once the Runner started execution of a workflow, it was Runner all the
>>> way down. Is the Fn API the only way to implement a runner? Our execution
>>> environment is a bit constrained in such a way that we can't expose the
>>> APIs required to implement the Fn API. To be forthright, we basically only
>>> have the ability to start a worker either with a known Pub/Sub topic to
>>> expect data from and a Pub/Sub topic to write to; or with a bundle of data
>>> to process and return the outputs for. We're constrained from really any
>>> additional communication with a worker beyond that.
>>>
>>
>> The "worker" abstraction gives the ability to wrap any user code in a way
>> that it can be called from any runner. If you're willing to constrain the
>> code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
>> logical, rather than physical, concept.
>>
>> Another way to look at it is that in practice, the "runner" often has its
>> own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
>> Worker" (which in turn invokes the actual DoFns). This latter may be
>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>
>>
>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <joey.t...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Thanks all for the responses!
>>>>>
>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>
>>>>>
>>>>> Portable sounds great, but the answer depends on how much additional
>>>>> cost it'd require to implement portable over non-portable, even 
>>>>> considering
>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>>> answer.
>>>>>
>>>>
>>>> I would way it would not be that expensive to write it in a "portable
>>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>>> interface rather than reaching into internal details) even if it doesn't
>>>> use GRCP and fire up separate processes/docker images for the workers
>>>> (preferring to do tall of that inline like the Python portable direct
>>>> runner does).
>>>>
>>>>
>>>>> - Which SDK I should use for this runner?
>>>>>>
>>>>> I'd be developing this runner against the python SDK and if the runner
>>>>> only worked with the python SDK that'd be okay in the short term
>>>>>
>>>>
>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>> not) if/when the need arises.
>>>>
>>>>
>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>
>>>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>>>> useful to a wide audience anyways).
>>>>>
>>>>> The context is we've been developing an in-house large-scale pipeline
>>>>> framework that encapsulates both the programming model and the
>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>> just reimplementing features and abstractions Beam has already 
>>>>> implemented,
>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>> particular though and our workflows require it (due to the way we license
>>>>> our software), so my plan was to try to create a very basic runner that
>>>>> uses our execution environment. The runner could have very few features
>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd 
>>>>> probably
>>>>> introduce a shim for some of our internally implemented transforms and
>>>>> assess from there.
>>>>>
>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>> thoughts as to whether this seems reasonable and achievable without a 
>>>>> large
>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>> that it might not be *easy*, but I don't have the resources to
>>>>> dedicate more than myself to work on a PoC)
>>>>>
>>>>
>>>> Totally doable by one person, especially given the limited feature set
>>>> you mention above.
>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>> is a good starting point as to what the relationship between a Runner and
>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>> the perspective of an SDK, but the story is largely about the interface
>>>> which is directly applicable).
>>>>
>>>> Given the limited feature set you proposed, this is similar to the
>>>> original Python portable runner which took a week or two to put together
>>>> (granted a lot has been added since then), or the typescript direct runner
>>>> (
>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>> only the Beam-side complexity of things (not the work of actually
>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>> workers, etc. but presumably that's the kind of thing your execution
>>>> environment already takes care of.
>>>>
>>>> As for some more concrete pointers, you could probably leverage a lot
>>>> of what's there by invoking create_stages
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>
>>>> which will do optimization, fusion, etc. and then implementing your own
>>>> version of run_stages
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>
>>>> to execute these in topological order on your compute infrastructure.
>>>> (If you're not doing streaming, this is much more straightforward than all
>>>> the bundler scheduler stuff that currently exists in that code).
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>> aromanenko....@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>
>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>
>>>>>>
>>>>>> The answer to this should be portable, as non-portable ones will be
>>>>>> deprecated.
>>>>>>
>>>>>>
>>>>>> Well, actually this is a question that I don’t remember we discussed
>>>>>> here in details before and had a common agreement.
>>>>>>
>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>> part should be deprecated and what is a reason for that?
>>>>>>
>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>
>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>> So I agree that more details on this would be useful.
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>>
>>>>>> - Which SDK I should use for this runner?
>>>>>>>
>>>>>>
>>>>>> The answer to the above question makes this one moot :).
>>>>>>
>>>>>> On a more serious note, could you tell us a bit more about the runner
>>>>>> you're looking at implementing?
>>>>>>
>>>>>>
>>>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>>>> the most similar Beam runners and use it as a more detailed source of
>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>
>>>>>>> —
>>>>>>> Alexey
>>>>>>>
>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Beam community!
>>>>>>>
>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>> execution environment but I'm struggling to get started. I've read the 
>>>>>>> docs
>>>>>>> page
>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>> concrete suggestions on getting started?
>>>>>>>
>>>>>>> I've started by cloning and running the hello world example
>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>> subclassed `PipelineRunner
>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My 
>>>>>>> custom
>>>>>>> runner just looks like
>>>>>>>
>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>                      options):
>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>
>>>>>>> And when using it I get an error about not having implemented
>>>>>>> "Impulse"
>>>>>>>
>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner 
>>>>>>> object
>>>>>>> at 0x135d9ff40>.
>>>>>>>
>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>>> example? I'm finding myself just digging through the beam source to try 
>>>>>>> to
>>>>>>> piece together how a runner works and I'm struggling to get a foothold. 
>>>>>>> Any
>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>> experience implementing their own python runner.
>>>>>>>
>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>> Cheers,
>>>>>>> Joey
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>

Reply via email to