Hi Robert,

I started implementing the grpc server for the worker service, and while
doing so, a question came to mind: the flags (endpoints) passed by the
runner to the worker service use "localhost" (this what I noticed when
experimenting with the Python pool), now, if I take these and pass them to
the Go SDK harness that I run on a K8s pod, it will run a worker with the
various ports specified in these endpoints, which is totally fine. This
should work for all the workers the service will start as each one will
live on a pod with a specific (DNS) name. However, from the runner's point
of view, it just passed localhost with some ports for each of these workers
it requested the worker service to create. In the current flink runner
things work because these workers are created as Go routines (or
multiprocesses in the case of Python) on the same Task Manager machine.
This obviously is not the case in the case of distributed workers that we
discussed. Am I missing something? What are your thoughts on how to go
about this?

I am also thinking now that starting the Go SDK harness on the TaskManager
and parallelizing TaskManager itself using the "parallelism" flag would be
more inline with the design of Flink. Does this make sense? One thing I
noticed though with the Flink Operator is that TaskManagers only take a job's
"JarURI"
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#application-deployments>,
so, maybe that's the limitation that motivates the implementation we
discussed?

Thanks,
Sherif


On Fri, Mar 24, 2023 at 6:02 PM Robert Burke <lostl...@apache.org> wrote:

> That's fine. While it adds a kubernetes dependency to the Beam go.mod, it
> won't actually be used by clients unless we're adding it to to one of the
> main packages. That's fairly safe.
>
> I'd recommend putting an appropriately named package under into the "cmd"
> directory, since it would be most useful as a stand alone command for
> someone to run. That can be hashed out appropriately during code review,
> once it's working for your usecase.
>
> On 2023/03/24 20:05:06 Sherif Tolba wrote:
> > Hi Robert,
> >
> > Thank you for your insightful response. The setup you described makes
> sense
> > to me and I'd like to give the grpc server implementation a try.
> Referring
> > to your point below:
> >
> > "This `workerService` would then use whatever it likes to get containers
> > onto VMs when requested by the Job service/flink."
> >
> > , it will be Kubernetes-specific for my use case, is this okay?
> >
> > Thanks,
> > Sherif
> >
> > On Thu, Mar 23, 2023 at 3:52 PM Robert Burke <lostl...@apache.org>
> wrote:
> >
> > > Oh that's very interesting! I have a few comments, but we could end up
> > > with a new feature for the Go SDK.
> > >
> > > As you've noted, you shouldn't really be manually spinning up step 4.
> It's
> > > up to the runner to do that, but it does look like for your usage, some
> > > assistance is needed.
> > >
> > > The Python Boot Loader has that in order to support "sibling
> processes",
> > > on a single VM container. Basically, it's a hack to get around the
> Global
> > > Interperter Lock slowing things down, and multiprocessing. Starting
> > > additional separate processes allows for efficient use of cores and
> > > multiprocessing. Go and Java don't need this since they have robust
> > > concurrency support, and will generally process each bundle sent to
> them in
> > > parallel.
> > >
> > > The bootloaders don't currently share a lot of code, since they were
> > > developed with the language harness they start up in mind.
> > >
> > > So the worker_pool flag you mention is here:
> > >
> > >
> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55
> > >
> > > The "External" stuff you see, is also how LOOPBACK mode operates. It
> just
> > > treats all desired workers as internal processes. But, it's largely
> just a
> > > service spec that anyone could implement and ensure it's pointed to.
> > >
> > > Eg. For Loopback in the Go SDK, the implementation is entirely here:
> > >
> > >
> https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33
> > >
> > > Python's equivalent is here:
> > >
> https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py
> > >
> > > The service definition is pretty straight forward here, just
> StartWorker
> > > and StopWorker requests.
> > >
> > >
> > >
> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104
> > >
> > > Basically, for distributed workers, you'd just need create a GRPC
> server
> > > for that API to spin up the worker with the right flags so it can
> connect
> > > and process the job. Note that this is "environment specific" so this
> would
> > > only start "Go SDK" workers (just as you've seen the Python only
> starting
> > > up Python SDK workers".)
> > >
> > > So if you're good with cluster managers, Kubernetes can be used for
> this
> > > for example instead of whatever Flink is managing.
> > >
> > > The way I'm currently picturing it is a separate service binary
> (mostly to
> > > avoid unnecessary built in deps...). If it's in the Go SDK Module, it
> > > should default to the Go SDK container, but there's no reason not to
> > > provide an override if desired.
> > >
> > > Default image for a given release is at :
> > >
> https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33
> > > (the dev versions aren't pushed by default, but this will work after
> > > release).
> > >
> > > Then it's up to the runner to talk to that service to start and stop
> > > workers as needed.
> > >
> > > $ workerService <external_worker_service_addr> --container=<container>
> > > ...other config...
> > >
> > > $ myPipelineBinary --runner=portable --environment_type=external
> > > --environment_config=<external_worker_service_addr> --endpoint=<beam
> job
> > > service>
> > >
> > > This `workerService` would then use whatever it likes to get containers
> > > onto VMs when requested by the Job service/flink.
> > >
> > > I'd be entirely delighted for such a thing to be contributed, and help
> > > review it. @lostluck on github, if you desired to go this path.
> > >
> > > Robert Burke
> > > Beam Go Busybody
> > >
> > > On 2023/03/23 15:09:44 Sherif Tolba wrote:
> > > > Thank you, Robert, for your detailed response and the resources you
> > > shared.
> > > >
> > > > One thing that I didn't mention is that my goal is to move the setup
> to
> > > EKS
> > > > after completing local experimentation. As you pointed out LOOPBACK
> is
> > > > mainly for local setups and testing. I also started with the DOCKER
> mode,
> > > > however, Flink's Job Manager threw an error:
> > > >
> > > > java.io.IOException: Cannot run program "docker": error=2, No such
> file
> > > or
> > > > directory
> > > >
> > > > despite using the unaltered official image:
> > > >
> > > > docker run --network=host --mount
> > > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0
> jobmanager
> > > >
> > > > I tried to build a custom image to install docker and make sure flink
> > > user
> > > > has the write permissions to call it, without success.
> > > >
> > > > Additionally, since I'd like move it eventually to the cluster, it
> made
> > > > more sense to me to try to use the EXTERNAL mode and have the workers
> > > spun
> > > > up as a separate Kubernetes deployment, then link to the associated
> K8s
> > > > service using the envConfig :=
> k8s-go-sdk-harness-svc-name:<port_number>
> > > > pipeline option.I saw something similar done in this article
> > > > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak
> is
> > > > using a Python pipeline and it is more straightforward to start a
> Python
> > > > SDK Harness using the -workerpool flag. I was able to create a
> Python SDK
> > > > Harness similar to what he did but, as expected, when submitting the
> Go
> > > > pipeline, it failed because the environment URN refers to Python and
> not
> > > Go.
> > > >
> > > > Below are more details about what I am doing:
> > > >
> > > > 1) Run Flink's Job Manager in one terminal using:
> > > >
> > > > docker run --network=host --mount
> > > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0
> jobmanager
> > > >
> > > > 2) Run Flink's Task Manager in another tab using:
> > > >
> > > > docker run --network=host --mount
> > > >
> > >
> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > > > flink:1.14.0 taskmanager
> > > >
> > > > 3) Run Beam's Job Server in a third tab:
> > > >
> > > > docker run --net=host --mount
> > > > type=bind,source=/tmp/staged,target=/tmp/staged
> > > > apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081
> > > >
> > > > 4) Try to run Go SDK Harness in a fourth tab (fauliure):
> > > >
> > > > docker run --network=host --mount
> > > >
> > >
> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > > > apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977
> > > > --artifact_endpoint=localhost:43219
> --provision_endpoint=localhost:34437
> > > > --control_endpoint=localhost:42935
> > > >
> > > > 5) Compile the Go pipeline and run it in a fifth tab:
> > > >
> > > > go build minimal_wordcount.go
> > > > ./minimal_wordcount
> > > >
> > > > *Code*
> > > >
> > > > func main() {
> > > >
> > > > setJobOptions()
> > > >
> > > > beam.Init()
> > > > p := beam.NewPipeline()
> > > > s := p.Root()
> > > >
> > > > lines := textio.Read(s,
> > > "gs://apache-beam-samples/shakespeare/kinglear.txt")
> > > > words := beam.ParDo(s, func(line string, emit func(string)) {
> > > > for _, word := range wordRE.FindAllString(line, -1) {
> > > > emit(word)
> > > > }
> > > > }, lines)
> > > >
> > > > counted := stats.Count(s, words)
> > > > formatted := beam.ParDo(s, func(w string, c int) string {
> > > > return fmt.Sprintf("%s: %v", w, c)
> > > > }, counted)
> > > >
> > > > textio.Write(s, "wordcounts.txt", formatted)
> > > > flink.Execute(context.Background(), p)
> > > > }
> > > >
> > > > func setJobOptions() {
> > > > endPoint := "localhost:8099"
> > > > envType := "EXTERNAL"
> > > > envConfig := "localhost:34437"
> > > > jobName := "test_word_count"
> > > > isAsync := true
> > > > parallelism := 1
> > > >
> > > > jobopts.JobName = &jobName
> > > > jobopts.Endpoint = &endPoint
> > > > jobopts.Async = &isAsync
> > > > jobopts.Parallelism = &parallelism
> > > > jobopts.EnvironmentType = &envType
> > > > jobopts.EnvironmentConfig = &envConfig
> > > > }
> > > >
> > > > I am still reading up the container contract you linked but not sure
> if
> > > > starting the harness manually is a good idea in the first place
> based on
> > > > what you mentioned at the beginning of your response.
> > > >
> > > > Thank you,
> > > > Sherif
> > > >
> > > >
> > > >
> > > > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org>
> > > wrote:
> > > >
> > > > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK
> it's
> > > up
> > > > > to the runner to orchestrate/set up properly configured workers
> with
> > > the
> > > > > containers.  With Beam, there should never be any need to manually
> set
> > > up
> > > > > workers for Flink, etc to run on.
> > > > >
> > > > > Those flags/etc are part of the "beam container contract", and are
> > > > > internal implementation details, that (ideally), an end pipeline
> author
> > > > > doesn't need to worry about. The original design doc is here:
> > > > > https://s.apache.org/beam-fn-api-container-contract, but it's
> rather
> > > out
> > > > > of date WRT the fine details (eg. Modern SDKs use the single
> > > > > ProvisioningService to get the other service URLs, rather than
> them all
> > > > > being provided by flags.)
> > > > >
> > > > > The official instructions are here:
> > > > >
> > > > > https://beam.apache.org/documentation/runners/flink/
> > > > >
> > > > > In particular, there are two modes to be aware of for local runs:
> > > > >
> > > > > 1st. LOOPBACK mode, which will have Flink "loop back" to the
> submitting
> > > > > job process to execute the job.
> > > > >
> > > > > Start the Flink Beam Job service: (eg. for flink1.10)
> > > > >
> > > > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > > > >
> > > > > Submitting your job to the Beam Job Server (eg. at localhost:8099),
> > > with
> > > > > the LOOPBACK environment type.
> > > > > --runner=PortableRunner
> > > > > --endpoint=localhost:8099
> > > > > --environment_type=LOOPBACK
> > > > >
> > > > > (Note, the doc there is very python SDK focused, so the
> --job_endpoint
> > > > > flag is just --endpoint in the Go SDK).
> > > > >
> > > > > Other than executing in the main process, this is still using
> portable
> > > > > beam.
> > > > >
> > > > > 2nd Container mode: This is closer to what you're trying to do.
> > > > >
> > > > > Per the linked doc, this requires you to start the Flink cluser
> with
> > > it's
> > > > > rest port (eg. localhost:8081), then with Docker, starting the
> > > connected
> > > > > Beam Job service:  (eg. for flink1.10)
> > > > >
> > > > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > > > > --flink-master=localhost:8081
> > > > >
> > > > > Note the "flink-master" flag, is how Beam ultimate sends jobs to
> flink,
> > > > > and then sets up the workers.
> > > > >
> > > > > Then submit your job to *that* endpoint (which should remain at
> > > > > localhost:8081) this largely should largely be the same, but
> without
> > > > > setting the "environment_type" flag.
> > > > >
> > > > > -------
> > > > >
> > > > > Finally, I'd be remiss not to try to point you to the in
> development
> > > > > "Prism" runner, which will eventually replace the current Go Direct
> > > runner.
> > > > >
> > > > > See
> > > > >
> > >
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism
> > > > > for current usage instructions and restrictions.
> > > > >
> > > > > It's currently suitable for smoke testing small pipelines, but the
> > > goal is
> > > > > to have a portable reference runner, WRT all facets of beam.
> Depending
> > > on
> > > > > what Beam features you're using, it may not be suitable.
> > > > >
> > > > > I hope this helps!
> > > > > Robert Burke
> > > > > Beam Go Busybody
> > > > >
> > > > > (note, I don't monitor this list, but Beam Go SDK questions tend to
> > > find
> > > > > their way to me)
> > > > >
> > > > > On 2023/03/22 11:51:53 Sherif Tolba wrote:
> > > > > > Hi Apache Beam team,
> > > > > >
> > > > > > I hope this email finds you all well. I have been experimenting
> with
> > > > > Apache
> > > > > > Beam and Flink, mainly using golang. I hit a roadblock when
> trying
> > > to run
> > > > > > the minimal word count example on Beam and Flink locally using
> Go SDK
> > > > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0"
> Docker
> > > image
> > > > > as
> > > > > > follows:
> > > > > >
> > > > > > docker run --network=host apache/beam_go_sdk:2.46.0
> > > > > > --id=1-1 --provision_endpoint=localhost:50000   <-- (I set this
> port
> > > > > based
> > > > > > on some research online, but I don't really know what the service
> > > should
> > > > > be)
> > > > > >
> > > > > > However, I am unable to understand what the following options
> > > represent:
> > > > > >
> > > > > > Usage of /opt/apache/beam/boot:
> > > > > >   -artifact_endpoint string
> > > > > >         Local artifact endpoint for FnHarness (required).
> > > > > >   -control_endpoint string
> > > > > >         Local control endpoint for FnHarness (required).
> > > > > >   -id string
> > > > > >         Local identifier (required).
> > > > > >   -logging_endpoint string
> > > > > >         Local logging endpoint for FnHarness (required).
> > > > > >   -provision_endpoint string
> > > > > >         Local provision endpoint for FnHarness (required).
> > > > > >   -semi_persist_dir string
> > > > > >         Local semi-persistent directory (optional). (default
> "/tmp")
> > > > > >
> > > > > > I checked:
> > > > > >
> https://github.com/apache/beam/blob/master/sdks/go/container/boot.go
> > > but
> > > > > > still unable to tell what these endpoints are. I couldn't find
> any
> > > online
> > > > > > documentation describing, for example, what the
> provision_endpoint
> > > should
> > > > > > be set to.
> > > > > >
> > > > > > I would greatly appreciate any pointers or explanation.
> > > > > >
> > > > > > My setup is as follows: I have a Flink JobManager, two
> TaskManagers,
> > > and
> > > > > a
> > > > > > Beam JobServer running locally. I can execute the pipeline that's
> > > written
> > > > > > in Go and see the job submitted on Flink's UI, however, it
> quickly
> > > fails
> > > > > > because there are no workers to execute the Go transforms.
> > > > > >
> > > > > > Thanks,
> > > > > > Sherif Tolba
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to