Hi Robert,

Thank you for your insightful response. The setup you described makes sense
to me and I'd like to give the grpc server implementation a try. Referring
to your point below:

"This `workerService` would then use whatever it likes to get containers
onto VMs when requested by the Job service/flink."

, it will be Kubernetes-specific for my use case, is this okay?

Thanks,
Sherif

On Thu, Mar 23, 2023 at 3:52 PM Robert Burke <lostl...@apache.org> wrote:

> Oh that's very interesting! I have a few comments, but we could end up
> with a new feature for the Go SDK.
>
> As you've noted, you shouldn't really be manually spinning up step 4. It's
> up to the runner to do that, but it does look like for your usage, some
> assistance is needed.
>
> The Python Boot Loader has that in order to support "sibling processes",
> on a single VM container. Basically, it's a hack to get around the Global
> Interperter Lock slowing things down, and multiprocessing. Starting
> additional separate processes allows for efficient use of cores and
> multiprocessing. Go and Java don't need this since they have robust
> concurrency support, and will generally process each bundle sent to them in
> parallel.
>
> The bootloaders don't currently share a lot of code, since they were
> developed with the language harness they start up in mind.
>
> So the worker_pool flag you mention is here:
>
> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55
>
> The "External" stuff you see, is also how LOOPBACK mode operates. It just
> treats all desired workers as internal processes. But, it's largely just a
> service spec that anyone could implement and ensure it's pointed to.
>
> Eg. For Loopback in the Go SDK, the implementation is entirely here:
>
> https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33
>
> Python's equivalent is here:
> https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py
>
> The service definition is pretty straight forward here, just StartWorker
> and StopWorker requests.
>
>
> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104
>
> Basically, for distributed workers, you'd just need create a GRPC server
> for that API to spin up the worker with the right flags so it can connect
> and process the job. Note that this is "environment specific" so this would
> only start "Go SDK" workers (just as you've seen the Python only starting
> up Python SDK workers".)
>
> So if you're good with cluster managers, Kubernetes can be used for this
> for example instead of whatever Flink is managing.
>
> The way I'm currently picturing it is a separate service binary (mostly to
> avoid unnecessary built in deps...). If it's in the Go SDK Module, it
> should default to the Go SDK container, but there's no reason not to
> provide an override if desired.
>
> Default image for a given release is at :
> https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33
> (the dev versions aren't pushed by default, but this will work after
> release).
>
> Then it's up to the runner to talk to that service to start and stop
> workers as needed.
>
> $ workerService <external_worker_service_addr> --container=<container>
> ...other config...
>
> $ myPipelineBinary --runner=portable --environment_type=external
> --environment_config=<external_worker_service_addr> --endpoint=<beam job
> service>
>
> This `workerService` would then use whatever it likes to get containers
> onto VMs when requested by the Job service/flink.
>
> I'd be entirely delighted for such a thing to be contributed, and help
> review it. @lostluck on github, if you desired to go this path.
>
> Robert Burke
> Beam Go Busybody
>
> On 2023/03/23 15:09:44 Sherif Tolba wrote:
> > Thank you, Robert, for your detailed response and the resources you
> shared.
> >
> > One thing that I didn't mention is that my goal is to move the setup to
> EKS
> > after completing local experimentation. As you pointed out LOOPBACK is
> > mainly for local setups and testing. I also started with the DOCKER mode,
> > however, Flink's Job Manager threw an error:
> >
> > java.io.IOException: Cannot run program "docker": error=2, No such file
> or
> > directory
> >
> > despite using the unaltered official image:
> >
> > docker run --network=host --mount
> > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager
> >
> > I tried to build a custom image to install docker and make sure flink
> user
> > has the write permissions to call it, without success.
> >
> > Additionally, since I'd like move it eventually to the cluster, it made
> > more sense to me to try to use the EXTERNAL mode and have the workers
> spun
> > up as a separate Kubernetes deployment, then link to the associated K8s
> > service using the envConfig := k8s-go-sdk-harness-svc-name:<port_number>
> > pipeline option.I saw something similar done in this article
> > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak is
> > using a Python pipeline and it is more straightforward to start a Python
> > SDK Harness using the -workerpool flag. I was able to create a Python SDK
> > Harness similar to what he did but, as expected, when submitting the Go
> > pipeline, it failed because the environment URN refers to Python and not
> Go.
> >
> > Below are more details about what I am doing:
> >
> > 1) Run Flink's Job Manager in one terminal using:
> >
> > docker run --network=host --mount
> > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager
> >
> > 2) Run Flink's Task Manager in another tab using:
> >
> > docker run --network=host --mount
> >
> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > flink:1.14.0 taskmanager
> >
> > 3) Run Beam's Job Server in a third tab:
> >
> > docker run --net=host --mount
> > type=bind,source=/tmp/staged,target=/tmp/staged
> > apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081
> >
> > 4) Try to run Go SDK Harness in a fourth tab (fauliure):
> >
> > docker run --network=host --mount
> >
> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977
> > --artifact_endpoint=localhost:43219 --provision_endpoint=localhost:34437
> > --control_endpoint=localhost:42935
> >
> > 5) Compile the Go pipeline and run it in a fifth tab:
> >
> > go build minimal_wordcount.go
> > ./minimal_wordcount
> >
> > *Code*
> >
> > func main() {
> >
> > setJobOptions()
> >
> > beam.Init()
> > p := beam.NewPipeline()
> > s := p.Root()
> >
> > lines := textio.Read(s,
> "gs://apache-beam-samples/shakespeare/kinglear.txt")
> > words := beam.ParDo(s, func(line string, emit func(string)) {
> > for _, word := range wordRE.FindAllString(line, -1) {
> > emit(word)
> > }
> > }, lines)
> >
> > counted := stats.Count(s, words)
> > formatted := beam.ParDo(s, func(w string, c int) string {
> > return fmt.Sprintf("%s: %v", w, c)
> > }, counted)
> >
> > textio.Write(s, "wordcounts.txt", formatted)
> > flink.Execute(context.Background(), p)
> > }
> >
> > func setJobOptions() {
> > endPoint := "localhost:8099"
> > envType := "EXTERNAL"
> > envConfig := "localhost:34437"
> > jobName := "test_word_count"
> > isAsync := true
> > parallelism := 1
> >
> > jobopts.JobName = &jobName
> > jobopts.Endpoint = &endPoint
> > jobopts.Async = &isAsync
> > jobopts.Parallelism = &parallelism
> > jobopts.EnvironmentType = &envType
> > jobopts.EnvironmentConfig = &envConfig
> > }
> >
> > I am still reading up the container contract you linked but not sure if
> > starting the harness manually is a good idea in the first place based on
> > what you mentioned at the beginning of your response.
> >
> > Thank you,
> > Sherif
> >
> >
> >
> > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org>
> wrote:
> >
> > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's
> up
> > > to the runner to orchestrate/set up properly configured workers with
> the
> > > containers.  With Beam, there should never be any need to manually set
> up
> > > workers for Flink, etc to run on.
> > >
> > > Those flags/etc are part of the "beam container contract", and are
> > > internal implementation details, that (ideally), an end pipeline author
> > > doesn't need to worry about. The original design doc is here:
> > > https://s.apache.org/beam-fn-api-container-contract, but it's rather
> out
> > > of date WRT the fine details (eg. Modern SDKs use the single
> > > ProvisioningService to get the other service URLs, rather than them all
> > > being provided by flags.)
> > >
> > > The official instructions are here:
> > >
> > > https://beam.apache.org/documentation/runners/flink/
> > >
> > > In particular, there are two modes to be aware of for local runs:
> > >
> > > 1st. LOOPBACK mode, which will have Flink "loop back" to the submitting
> > > job process to execute the job.
> > >
> > > Start the Flink Beam Job service: (eg. for flink1.10)
> > >
> > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > >
> > > Submitting your job to the Beam Job Server (eg. at localhost:8099),
> with
> > > the LOOPBACK environment type.
> > > --runner=PortableRunner
> > > --endpoint=localhost:8099
> > > --environment_type=LOOPBACK
> > >
> > > (Note, the doc there is very python SDK focused, so the --job_endpoint
> > > flag is just --endpoint in the Go SDK).
> > >
> > > Other than executing in the main process, this is still using portable
> > > beam.
> > >
> > > 2nd Container mode: This is closer to what you're trying to do.
> > >
> > > Per the linked doc, this requires you to start the Flink cluser with
> it's
> > > rest port (eg. localhost:8081), then with Docker, starting the
> connected
> > > Beam Job service:  (eg. for flink1.10)
> > >
> > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > > --flink-master=localhost:8081
> > >
> > > Note the "flink-master" flag, is how Beam ultimate sends jobs to flink,
> > > and then sets up the workers.
> > >
> > > Then submit your job to *that* endpoint (which should remain at
> > > localhost:8081) this largely should largely be the same, but without
> > > setting the "environment_type" flag.
> > >
> > > -------
> > >
> > > Finally, I'd be remiss not to try to point you to the in development
> > > "Prism" runner, which will eventually replace the current Go Direct
> runner.
> > >
> > > See
> > >
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism
> > > for current usage instructions and restrictions.
> > >
> > > It's currently suitable for smoke testing small pipelines, but the
> goal is
> > > to have a portable reference runner, WRT all facets of beam. Depending
> on
> > > what Beam features you're using, it may not be suitable.
> > >
> > > I hope this helps!
> > > Robert Burke
> > > Beam Go Busybody
> > >
> > > (note, I don't monitor this list, but Beam Go SDK questions tend to
> find
> > > their way to me)
> > >
> > > On 2023/03/22 11:51:53 Sherif Tolba wrote:
> > > > Hi Apache Beam team,
> > > >
> > > > I hope this email finds you all well. I have been experimenting with
> > > Apache
> > > > Beam and Flink, mainly using golang. I hit a roadblock when trying
> to run
> > > > the minimal word count example on Beam and Flink locally using Go SDK
> > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker
> image
> > > as
> > > > follows:
> > > >
> > > > docker run --network=host apache/beam_go_sdk:2.46.0
> > > > --id=1-1 --provision_endpoint=localhost:50000   <-- (I set this port
> > > based
> > > > on some research online, but I don't really know what the service
> should
> > > be)
> > > >
> > > > However, I am unable to understand what the following options
> represent:
> > > >
> > > > Usage of /opt/apache/beam/boot:
> > > >   -artifact_endpoint string
> > > >         Local artifact endpoint for FnHarness (required).
> > > >   -control_endpoint string
> > > >         Local control endpoint for FnHarness (required).
> > > >   -id string
> > > >         Local identifier (required).
> > > >   -logging_endpoint string
> > > >         Local logging endpoint for FnHarness (required).
> > > >   -provision_endpoint string
> > > >         Local provision endpoint for FnHarness (required).
> > > >   -semi_persist_dir string
> > > >         Local semi-persistent directory (optional). (default "/tmp")
> > > >
> > > > I checked:
> > > > https://github.com/apache/beam/blob/master/sdks/go/container/boot.go
> but
> > > > still unable to tell what these endpoints are. I couldn't find any
> online
> > > > documentation describing, for example, what the provision_endpoint
> should
> > > > be set to.
> > > >
> > > > I would greatly appreciate any pointers or explanation.
> > > >
> > > > My setup is as follows: I have a Flink JobManager, two TaskManagers,
> and
> > > a
> > > > Beam JobServer running locally. I can execute the pipeline that's
> written
> > > > in Go and see the job submitted on Flink's UI, however, it quickly
> fails
> > > > because there are no workers to execute the Go transforms.
> > > >
> > > > Thanks,
> > > > Sherif Tolba
> > > >
> > >
> >
>

Reply via email to