Hi Robert, Thank you for your insightful response. The setup you described makes sense to me and I'd like to give the grpc server implementation a try. Referring to your point below:
"This `workerService` would then use whatever it likes to get containers onto VMs when requested by the Job service/flink." , it will be Kubernetes-specific for my use case, is this okay? Thanks, Sherif On Thu, Mar 23, 2023 at 3:52 PM Robert Burke <lostl...@apache.org> wrote: > Oh that's very interesting! I have a few comments, but we could end up > with a new feature for the Go SDK. > > As you've noted, you shouldn't really be manually spinning up step 4. It's > up to the runner to do that, but it does look like for your usage, some > assistance is needed. > > The Python Boot Loader has that in order to support "sibling processes", > on a single VM container. Basically, it's a hack to get around the Global > Interperter Lock slowing things down, and multiprocessing. Starting > additional separate processes allows for efficient use of cores and > multiprocessing. Go and Java don't need this since they have robust > concurrency support, and will generally process each bundle sent to them in > parallel. > > The bootloaders don't currently share a lot of code, since they were > developed with the language harness they start up in mind. > > So the worker_pool flag you mention is here: > > https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55 > > The "External" stuff you see, is also how LOOPBACK mode operates. It just > treats all desired workers as internal processes. But, it's largely just a > service spec that anyone could implement and ensure it's pointed to. > > Eg. For Loopback in the Go SDK, the implementation is entirely here: > > https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33 > > Python's equivalent is here: > https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py > > The service definition is pretty straight forward here, just StartWorker > and StopWorker requests. > > > https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104 > > Basically, for distributed workers, you'd just need create a GRPC server > for that API to spin up the worker with the right flags so it can connect > and process the job. Note that this is "environment specific" so this would > only start "Go SDK" workers (just as you've seen the Python only starting > up Python SDK workers".) > > So if you're good with cluster managers, Kubernetes can be used for this > for example instead of whatever Flink is managing. > > The way I'm currently picturing it is a separate service binary (mostly to > avoid unnecessary built in deps...). If it's in the Go SDK Module, it > should default to the Go SDK container, but there's no reason not to > provide an override if desired. > > Default image for a given release is at : > https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33 > (the dev versions aren't pushed by default, but this will work after > release). > > Then it's up to the runner to talk to that service to start and stop > workers as needed. > > $ workerService <external_worker_service_addr> --container=<container> > ...other config... > > $ myPipelineBinary --runner=portable --environment_type=external > --environment_config=<external_worker_service_addr> --endpoint=<beam job > service> > > This `workerService` would then use whatever it likes to get containers > onto VMs when requested by the Job service/flink. > > I'd be entirely delighted for such a thing to be contributed, and help > review it. @lostluck on github, if you desired to go this path. > > Robert Burke > Beam Go Busybody > > On 2023/03/23 15:09:44 Sherif Tolba wrote: > > Thank you, Robert, for your detailed response and the resources you > shared. > > > > One thing that I didn't mention is that my goal is to move the setup to > EKS > > after completing local experimentation. As you pointed out LOOPBACK is > > mainly for local setups and testing. I also started with the DOCKER mode, > > however, Flink's Job Manager threw an error: > > > > java.io.IOException: Cannot run program "docker": error=2, No such file > or > > directory > > > > despite using the unaltered official image: > > > > docker run --network=host --mount > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager > > > > I tried to build a custom image to install docker and make sure flink > user > > has the write permissions to call it, without success. > > > > Additionally, since I'd like move it eventually to the cluster, it made > > more sense to me to try to use the EXTERNAL mode and have the workers > spun > > up as a separate Kubernetes deployment, then link to the associated K8s > > service using the envConfig := k8s-go-sdk-harness-svc-name:<port_number> > > pipeline option.I saw something similar done in this article > > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak is > > using a Python pipeline and it is more straightforward to start a Python > > SDK Harness using the -workerpool flag. I was able to create a Python SDK > > Harness similar to what he did but, as expected, when submitting the Go > > pipeline, it failed because the environment URN refers to Python and not > Go. > > > > Below are more details about what I am doing: > > > > 1) Run Flink's Job Manager in one terminal using: > > > > docker run --network=host --mount > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager > > > > 2) Run Flink's Task Manager in another tab using: > > > > docker run --network=host --mount > > > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging > > flink:1.14.0 taskmanager > > > > 3) Run Beam's Job Server in a third tab: > > > > docker run --net=host --mount > > type=bind,source=/tmp/staged,target=/tmp/staged > > apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081 > > > > 4) Try to run Go SDK Harness in a fourth tab (fauliure): > > > > docker run --network=host --mount > > > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging > > apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977 > > --artifact_endpoint=localhost:43219 --provision_endpoint=localhost:34437 > > --control_endpoint=localhost:42935 > > > > 5) Compile the Go pipeline and run it in a fifth tab: > > > > go build minimal_wordcount.go > > ./minimal_wordcount > > > > *Code* > > > > func main() { > > > > setJobOptions() > > > > beam.Init() > > p := beam.NewPipeline() > > s := p.Root() > > > > lines := textio.Read(s, > "gs://apache-beam-samples/shakespeare/kinglear.txt") > > words := beam.ParDo(s, func(line string, emit func(string)) { > > for _, word := range wordRE.FindAllString(line, -1) { > > emit(word) > > } > > }, lines) > > > > counted := stats.Count(s, words) > > formatted := beam.ParDo(s, func(w string, c int) string { > > return fmt.Sprintf("%s: %v", w, c) > > }, counted) > > > > textio.Write(s, "wordcounts.txt", formatted) > > flink.Execute(context.Background(), p) > > } > > > > func setJobOptions() { > > endPoint := "localhost:8099" > > envType := "EXTERNAL" > > envConfig := "localhost:34437" > > jobName := "test_word_count" > > isAsync := true > > parallelism := 1 > > > > jobopts.JobName = &jobName > > jobopts.Endpoint = &endPoint > > jobopts.Async = &isAsync > > jobopts.Parallelism = ¶llelism > > jobopts.EnvironmentType = &envType > > jobopts.EnvironmentConfig = &envConfig > > } > > > > I am still reading up the container contract you linked but not sure if > > starting the harness manually is a good idea in the first place based on > > what you mentioned at the beginning of your response. > > > > Thank you, > > Sherif > > > > > > > > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org> > wrote: > > > > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's > up > > > to the runner to orchestrate/set up properly configured workers with > the > > > containers. With Beam, there should never be any need to manually set > up > > > workers for Flink, etc to run on. > > > > > > Those flags/etc are part of the "beam container contract", and are > > > internal implementation details, that (ideally), an end pipeline author > > > doesn't need to worry about. The original design doc is here: > > > https://s.apache.org/beam-fn-api-container-contract, but it's rather > out > > > of date WRT the fine details (eg. Modern SDKs use the single > > > ProvisioningService to get the other service URLs, rather than them all > > > being provided by flags.) > > > > > > The official instructions are here: > > > > > > https://beam.apache.org/documentation/runners/flink/ > > > > > > In particular, there are two modes to be aware of for local runs: > > > > > > 1st. LOOPBACK mode, which will have Flink "loop back" to the submitting > > > job process to execute the job. > > > > > > Start the Flink Beam Job service: (eg. for flink1.10) > > > > > > docker run --net=host apache/beam_flink1.10_job_server:latest > > > > > > Submitting your job to the Beam Job Server (eg. at localhost:8099), > with > > > the LOOPBACK environment type. > > > --runner=PortableRunner > > > --endpoint=localhost:8099 > > > --environment_type=LOOPBACK > > > > > > (Note, the doc there is very python SDK focused, so the --job_endpoint > > > flag is just --endpoint in the Go SDK). > > > > > > Other than executing in the main process, this is still using portable > > > beam. > > > > > > 2nd Container mode: This is closer to what you're trying to do. > > > > > > Per the linked doc, this requires you to start the Flink cluser with > it's > > > rest port (eg. localhost:8081), then with Docker, starting the > connected > > > Beam Job service: (eg. for flink1.10) > > > > > > docker run --net=host apache/beam_flink1.10_job_server:latest > > > --flink-master=localhost:8081 > > > > > > Note the "flink-master" flag, is how Beam ultimate sends jobs to flink, > > > and then sets up the workers. > > > > > > Then submit your job to *that* endpoint (which should remain at > > > localhost:8081) this largely should largely be the same, but without > > > setting the "environment_type" flag. > > > > > > ------- > > > > > > Finally, I'd be remiss not to try to point you to the in development > > > "Prism" runner, which will eventually replace the current Go Direct > runner. > > > > > > See > > > > https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism > > > for current usage instructions and restrictions. > > > > > > It's currently suitable for smoke testing small pipelines, but the > goal is > > > to have a portable reference runner, WRT all facets of beam. Depending > on > > > what Beam features you're using, it may not be suitable. > > > > > > I hope this helps! > > > Robert Burke > > > Beam Go Busybody > > > > > > (note, I don't monitor this list, but Beam Go SDK questions tend to > find > > > their way to me) > > > > > > On 2023/03/22 11:51:53 Sherif Tolba wrote: > > > > Hi Apache Beam team, > > > > > > > > I hope this email finds you all well. I have been experimenting with > > > Apache > > > > Beam and Flink, mainly using golang. I hit a roadblock when trying > to run > > > > the minimal word count example on Beam and Flink locally using Go SDK > > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker > image > > > as > > > > follows: > > > > > > > > docker run --network=host apache/beam_go_sdk:2.46.0 > > > > --id=1-1 --provision_endpoint=localhost:50000 <-- (I set this port > > > based > > > > on some research online, but I don't really know what the service > should > > > be) > > > > > > > > However, I am unable to understand what the following options > represent: > > > > > > > > Usage of /opt/apache/beam/boot: > > > > -artifact_endpoint string > > > > Local artifact endpoint for FnHarness (required). > > > > -control_endpoint string > > > > Local control endpoint for FnHarness (required). > > > > -id string > > > > Local identifier (required). > > > > -logging_endpoint string > > > > Local logging endpoint for FnHarness (required). > > > > -provision_endpoint string > > > > Local provision endpoint for FnHarness (required). > > > > -semi_persist_dir string > > > > Local semi-persistent directory (optional). (default "/tmp") > > > > > > > > I checked: > > > > https://github.com/apache/beam/blob/master/sdks/go/container/boot.go > but > > > > still unable to tell what these endpoints are. I couldn't find any > online > > > > documentation describing, for example, what the provision_endpoint > should > > > > be set to. > > > > > > > > I would greatly appreciate any pointers or explanation. > > > > > > > > My setup is as follows: I have a Flink JobManager, two TaskManagers, > and > > > a > > > > Beam JobServer running locally. I can execute the pipeline that's > written > > > > in Go and see the job submitted on Flink's UI, however, it quickly > fails > > > > because there are no workers to execute the Go transforms. > > > > > > > > Thanks, > > > > Sherif Tolba > > > > > > > > > >