Hi!

I need help implementing a native Kubernetes Flink cluster that needs to
run batch jobs (run by TensorFlow Extended), but I am not sure I am
configuring it right as I have issues running jobs on more than one task
manager, while jobs run fine if there is only one TM.

I use the following parameters for the job:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        f"--environment_config={beam_sdk_url}:50000",
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",


I have configured the Beam workers to run as side-cars to the TM
containers. I do this by configuring. task manager template for the pods
like this:

kubernetes.pod-template-file.taskmanager

it is pointing out to a template file with contents:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
     #hostNetwork: true
     containers:
      - name: flink-main-container
        #image: apache/flink:scala_2.12
        env:
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
          - name: PYTHONPATH
            value: "/data/flink/src"
        args: ["taskmanager"]
        ports:
        - containerPort: 6122 #22
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122 #22
          initialDelaySeconds: 30
          periodSeconds: 60
      - name: beam-worker-pool
        env:
          - name: PYTHONPATH
            value: "/data/flink/src"
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
        image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
        imagePullPolicy: Always
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60

I have also created a kubernetes load balancer for the task managers, so
clients can connect on port 50000. So I use that address when configuring:

f"--environment_config={beam_sdk_url}:50000",

the problem is as it looks like the Beam SDK harness on one task manager
wants to connect to the endpoint running on the other task manager, but
looks for it on localhost:

Log from beam-worker-pool on TM 2:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed
to dial server at localhost:33705
    caused by:
context deadline exceeded

The provision endpoint on TM 1 is the one actually listening on the port
33705, while this is looking for it on localhost, so cannot connect to it.

Showing how I test this:

...............

TM 1:
========
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:33383',
'--artifact_endpoint=localhost:43477',
'--provision_endpoint=localhost:40983',
'--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed
to dial server at localhost:40983
    caused by:
context deadline exceeded

TM 2:
=========
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:40497',
'--artifact_endpoint=localhost:36245',
'--provision_endpoint=localhost:32907',
'--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed
to dial server at localhost:32907
    caused by:
context deadline exceeded

Testing:
.........................

TM 1:
============
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c
beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

Not sure how to fix this.

Thanks, Gorjan

Reply via email to