Yes! That did it. Changed to localhost and all works fine now.
I was wrong thinking it  would like to connect to Beam SDK worker from my
client machine, hence i added the load balancer.

Thank you Jan!

On Sun, 15 Aug 2021 at 16:45, Jan Lukavský <je...@seznam.cz> wrote:

> Hi Gorjan,
>
> the address of localhost is hard-coded in the python worker pool (see
> [1]). There should be no need to setup a load-balancer for the worker_pool,
> if you have it as another container in each TM pod, it should suffice to
> replace {beam_sdk_url} with 'localhost'. Each TM will then have its own
> worker_pool, which should be just fine.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81
> On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
>
> Hi!
>
> I need help implementing a native Kubernetes Flink cluster that needs to
> run batch jobs (run by TensorFlow Extended), but I am not sure I am
> configuring it right as I have issues running jobs on more than one task
> manager, while jobs run fine if there is only one TM.
>
> I use the following parameters for the job:
>
>         "--runner=FlinkRunner",
>         "--parallelism=4",
>         f"--flink_master={flink_url}:8081",
>         "--environment_type=EXTERNAL",
>         f"--environment_config={beam_sdk_url}:50000",
>         "--flink_submit_uber_jar",
>         "--worker_harness_container_image=none",
>
>
> I have configured the Beam workers to run as side-cars to the TM
> containers. I do this by configuring. task manager template for the pods
> like this:
>
> kubernetes.pod-template-file.taskmanager
>
> it is pointing out to a template file with contents:
>
> kind: Pod
> metadata:
>   name: taskmanager-pod-template
> spec:
>      #hostNetwork: true
>      containers:
>       - name: flink-main-container
>         #image: apache/flink:scala_2.12
>         env:
>           - name: AWS_REGION
>             value: "eu-central-1"
>           - name: S3_VERIFY_SSL
>             value: "0"
>           - name: PYTHONPATH
>             value: "/data/flink/src"
>         args: ["taskmanager"]
>         ports:
>         - containerPort: 6122 #22
>           name: rpc
>         - containerPort: 6125
>           name: query-state
>         livenessProbe:
>           tcpSocket:
>             port: 6122 #22
>           initialDelaySeconds: 30
>           periodSeconds: 60
>       - name: beam-worker-pool
>         env:
>           - name: PYTHONPATH
>             value: "/data/flink/src"
>           - name: AWS_REGION
>             value: "eu-central-1"
>           - name: S3_VERIFY_SSL
>             value: "0"
>         image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
>         imagePullPolicy: Always
>         args: ["--worker_pool"]
>         ports:
>         - containerPort: 50000
>           name: pool
>         livenessProbe:
>           tcpSocket:
>             port: 50000
>           initialDelaySeconds: 30
>           periodSeconds: 60
>
> I have also created a kubernetes load balancer for the task managers, so
> clients can connect on port 50000. So I use that address when configuring:
>
> f"--environment_config={beam_sdk_url}:50000",
>
> the problem is as it looks like the Beam SDK harness on one task manager
> wants to connect to the endpoint running on the other task manager, but
> looks for it on localhost:
>
> Log from beam-worker-pool on TM 2:
>
> 2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial 
> server at localhost:33705
>     caused by:
> context deadline exceeded
>
> The provision endpoint on TM 1 is the one actually listening on the port
> 33705, while this is looking for it on localhost, so cannot connect to it.
>
> Showing how I test this:
>
> ...............
>
> TM 1:
> ========
> $ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
> 2021/08/12 09:10:34 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=50000 
> --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', 
> '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
> 2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial 
> server at localhost:40983
>     caused by:
> context deadline exceeded
>
> TM 2:
> =========
> $ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
> 2021/08/12 09:10:33 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=50000 
> --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', 
> '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
> 2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial 
> server at localhost:32907
>     caused by:
> context deadline exceeded
>
> Testing:
> .........................
>
> TM 1:
> ============
> $ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 
> -- bash
> root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
> curl: (7) Failed to connect to localhost port 40983: Connection refused
>
> root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
> Warning: Binary output can mess up your terminal. Use "--output -" to ...
>
>
> TM 2:
> =============
> root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
> curl: (7) Failed to connect to localhost port 32907: Connection refused
>
> root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
> Warning: Binary output can mess up your terminal. Use "--output -" to tell
> Warning: curl to output it to your terminal anyway, or consider "--output
>
> Not sure how to fix this.
>
> Thanks, Gorjan
>
>

Reply via email to