Oh that's very interesting! I have a few comments, but we could end up with a new feature for the Go SDK.
As you've noted, you shouldn't really be manually spinning up step 4. It's up to the runner to do that, but it does look like for your usage, some assistance is needed. The Python Boot Loader has that in order to support "sibling processes", on a single VM container. Basically, it's a hack to get around the Global Interperter Lock slowing things down, and multiprocessing. Starting additional separate processes allows for efficient use of cores and multiprocessing. Go and Java don't need this since they have robust concurrency support, and will generally process each bundle sent to them in parallel. The bootloaders don't currently share a lot of code, since they were developed with the language harness they start up in mind. So the worker_pool flag you mention is here: https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55 The "External" stuff you see, is also how LOOPBACK mode operates. It just treats all desired workers as internal processes. But, it's largely just a service spec that anyone could implement and ensure it's pointed to. Eg. For Loopback in the Go SDK, the implementation is entirely here: https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33 Python's equivalent is here: https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py The service definition is pretty straight forward here, just StartWorker and StopWorker requests. https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104 Basically, for distributed workers, you'd just need create a GRPC server for that API to spin up the worker with the right flags so it can connect and process the job. Note that this is "environment specific" so this would only start "Go SDK" workers (just as you've seen the Python only starting up Python SDK workers".) So if you're good with cluster managers, Kubernetes can be used for this for example instead of whatever Flink is managing. The way I'm currently picturing it is a separate service binary (mostly to avoid unnecessary built in deps...). If it's in the Go SDK Module, it should default to the Go SDK container, but there's no reason not to provide an override if desired. Default image for a given release is at : https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33 (the dev versions aren't pushed by default, but this will work after release). Then it's up to the runner to talk to that service to start and stop workers as needed. $ workerService <external_worker_service_addr> --container=<container> ...other config... $ myPipelineBinary --runner=portable --environment_type=external --environment_config=<external_worker_service_addr> --endpoint=<beam job service> This `workerService` would then use whatever it likes to get containers onto VMs when requested by the Job service/flink. I'd be entirely delighted for such a thing to be contributed, and help review it. @lostluck on github, if you desired to go this path. Robert Burke Beam Go Busybody On 2023/03/23 15:09:44 Sherif Tolba wrote: > Thank you, Robert, for your detailed response and the resources you shared. > > One thing that I didn't mention is that my goal is to move the setup to EKS > after completing local experimentation. As you pointed out LOOPBACK is > mainly for local setups and testing. I also started with the DOCKER mode, > however, Flink's Job Manager threw an error: > > java.io.IOException: Cannot run program "docker": error=2, No such file or > directory > > despite using the unaltered official image: > > docker run --network=host --mount > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager > > I tried to build a custom image to install docker and make sure flink user > has the write permissions to call it, without success. > > Additionally, since I'd like move it eventually to the cluster, it made > more sense to me to try to use the EXTERNAL mode and have the workers spun > up as a separate Kubernetes deployment, then link to the associated K8s > service using the envConfig := k8s-go-sdk-harness-svc-name:<port_number> > pipeline option.I saw something similar done in this article > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak is > using a Python pipeline and it is more straightforward to start a Python > SDK Harness using the -workerpool flag. I was able to create a Python SDK > Harness similar to what he did but, as expected, when submitting the Go > pipeline, it failed because the environment URN refers to Python and not Go. > > Below are more details about what I am doing: > > 1) Run Flink's Job Manager in one terminal using: > > docker run --network=host --mount > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager > > 2) Run Flink's Task Manager in another tab using: > > docker run --network=host --mount > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging > flink:1.14.0 taskmanager > > 3) Run Beam's Job Server in a third tab: > > docker run --net=host --mount > type=bind,source=/tmp/staged,target=/tmp/staged > apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081 > > 4) Try to run Go SDK Harness in a fourth tab (fauliure): > > docker run --network=host --mount > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging > apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977 > --artifact_endpoint=localhost:43219 --provision_endpoint=localhost:34437 > --control_endpoint=localhost:42935 > > 5) Compile the Go pipeline and run it in a fifth tab: > > go build minimal_wordcount.go > ./minimal_wordcount > > *Code* > > func main() { > > setJobOptions() > > beam.Init() > p := beam.NewPipeline() > s := p.Root() > > lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") > words := beam.ParDo(s, func(line string, emit func(string)) { > for _, word := range wordRE.FindAllString(line, -1) { > emit(word) > } > }, lines) > > counted := stats.Count(s, words) > formatted := beam.ParDo(s, func(w string, c int) string { > return fmt.Sprintf("%s: %v", w, c) > }, counted) > > textio.Write(s, "wordcounts.txt", formatted) > flink.Execute(context.Background(), p) > } > > func setJobOptions() { > endPoint := "localhost:8099" > envType := "EXTERNAL" > envConfig := "localhost:34437" > jobName := "test_word_count" > isAsync := true > parallelism := 1 > > jobopts.JobName = &jobName > jobopts.Endpoint = &endPoint > jobopts.Async = &isAsync > jobopts.Parallelism = ¶llelism > jobopts.EnvironmentType = &envType > jobopts.EnvironmentConfig = &envConfig > } > > I am still reading up the container contract you linked but not sure if > starting the harness manually is a good idea in the first place based on > what you mentioned at the beginning of your response. > > Thank you, > Sherif > > > > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org> wrote: > > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's up > > to the runner to orchestrate/set up properly configured workers with the > > containers. With Beam, there should never be any need to manually set up > > workers for Flink, etc to run on. > > > > Those flags/etc are part of the "beam container contract", and are > > internal implementation details, that (ideally), an end pipeline author > > doesn't need to worry about. The original design doc is here: > > https://s.apache.org/beam-fn-api-container-contract, but it's rather out > > of date WRT the fine details (eg. Modern SDKs use the single > > ProvisioningService to get the other service URLs, rather than them all > > being provided by flags.) > > > > The official instructions are here: > > > > https://beam.apache.org/documentation/runners/flink/ > > > > In particular, there are two modes to be aware of for local runs: > > > > 1st. LOOPBACK mode, which will have Flink "loop back" to the submitting > > job process to execute the job. > > > > Start the Flink Beam Job service: (eg. for flink1.10) > > > > docker run --net=host apache/beam_flink1.10_job_server:latest > > > > Submitting your job to the Beam Job Server (eg. at localhost:8099), with > > the LOOPBACK environment type. > > --runner=PortableRunner > > --endpoint=localhost:8099 > > --environment_type=LOOPBACK > > > > (Note, the doc there is very python SDK focused, so the --job_endpoint > > flag is just --endpoint in the Go SDK). > > > > Other than executing in the main process, this is still using portable > > beam. > > > > 2nd Container mode: This is closer to what you're trying to do. > > > > Per the linked doc, this requires you to start the Flink cluser with it's > > rest port (eg. localhost:8081), then with Docker, starting the connected > > Beam Job service: (eg. for flink1.10) > > > > docker run --net=host apache/beam_flink1.10_job_server:latest > > --flink-master=localhost:8081 > > > > Note the "flink-master" flag, is how Beam ultimate sends jobs to flink, > > and then sets up the workers. > > > > Then submit your job to *that* endpoint (which should remain at > > localhost:8081) this largely should largely be the same, but without > > setting the "environment_type" flag. > > > > ------- > > > > Finally, I'd be remiss not to try to point you to the in development > > "Prism" runner, which will eventually replace the current Go Direct runner. > > > > See > > https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism > > for current usage instructions and restrictions. > > > > It's currently suitable for smoke testing small pipelines, but the goal is > > to have a portable reference runner, WRT all facets of beam. Depending on > > what Beam features you're using, it may not be suitable. > > > > I hope this helps! > > Robert Burke > > Beam Go Busybody > > > > (note, I don't monitor this list, but Beam Go SDK questions tend to find > > their way to me) > > > > On 2023/03/22 11:51:53 Sherif Tolba wrote: > > > Hi Apache Beam team, > > > > > > I hope this email finds you all well. I have been experimenting with > > Apache > > > Beam and Flink, mainly using golang. I hit a roadblock when trying to run > > > the minimal word count example on Beam and Flink locally using Go SDK > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker image > > as > > > follows: > > > > > > docker run --network=host apache/beam_go_sdk:2.46.0 > > > --id=1-1 --provision_endpoint=localhost:50000 <-- (I set this port > > based > > > on some research online, but I don't really know what the service should > > be) > > > > > > However, I am unable to understand what the following options represent: > > > > > > Usage of /opt/apache/beam/boot: > > > -artifact_endpoint string > > > Local artifact endpoint for FnHarness (required). > > > -control_endpoint string > > > Local control endpoint for FnHarness (required). > > > -id string > > > Local identifier (required). > > > -logging_endpoint string > > > Local logging endpoint for FnHarness (required). > > > -provision_endpoint string > > > Local provision endpoint for FnHarness (required). > > > -semi_persist_dir string > > > Local semi-persistent directory (optional). (default "/tmp") > > > > > > I checked: > > > https://github.com/apache/beam/blob/master/sdks/go/container/boot.go but > > > still unable to tell what these endpoints are. I couldn't find any online > > > documentation describing, for example, what the provision_endpoint should > > > be set to. > > > > > > I would greatly appreciate any pointers or explanation. > > > > > > My setup is as follows: I have a Flink JobManager, two TaskManagers, and > > a > > > Beam JobServer running locally. I can execute the pipeline that's written > > > in Go and see the job submitted on Flink's UI, however, it quickly fails > > > because there are no workers to execute the Go transforms. > > > > > > Thanks, > > > Sherif Tolba > > > > > >