Hi Kevin, For your information, bellow is an example for running a PyFlink table API WordCount job.
1. Building a Docker image with Python and PyFlink Installed: Dockerfile: FROM flink:1.12.0 # install python3 and pip3 RUN apt-get update -y && \ apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/* RUN ln -s /usr/bin/python3 /usr/bin/python # install Python Flink RUN pip3 install apache-flink==1.12.0 2. Resource definitions: Flink-configuration-configmap.yaml: apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF Job-manager-service.yaml: apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager Job-manager.yaml apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: pyflink:v1 env: args: ["standalone-job", "-py", "/opt/flink/examples/python/table/batch/word_count.py"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties Task-manager.yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: pyflink:v1 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties 3. Creating resources: $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml# Create the deployments for the cluster$ kubectl create -f job-manager.yaml$ kubectl create -f task-manager.yaml Best, Shuiqiang Shuiqiang Chen <acqua....@gmail.com> 于2021年3月6日周六 下午5:10写道: > Hi Kevin, > > You are able to run PyFlink applications on kuberetes cluster, both native > k8s mode and resource definition mode are supported since release-1.12.0. > Currently, Python and PyFlink are not enabled in official flink docker > image, that you might need to build a custom image with Python and PyFlink > install, please refer to Enbale Python in docker > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python> > . > > Generally, by setting the value of args field in > `jobmanager-application.yaml` to be args: ["standalone-job", "--python", > "my_python_app.py", <optional arguments>, <job arguments>] the job > manager will try to submit a PyFlink job with the specified python file > once it is started. You can check the pod status for jobmanger and > taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will > turn to the completed state once the job is finished or error state if > there is something wrong, while the task manger pod will always be in the > running state. > > Finally, it requires you to tear down the cluster by deleting all created > resources (jobmanger/taskmanger jobs, flink-conf configmap, > jobmanger-service, etc). > > Best, > Shuiqiang > > > > Kevin Lam <kevin....@shopify.com> 于2021年3月6日周六 上午5:29写道: > >> Hello everyone, >> >> I'm looking to run a Pyflink application run in a distributed fashion, >> using kubernetes, and am currently facing issues. I've successfully gotten >> a Scala Flink Application to run using the manifests provided at [0] >> >> I attempted to run the application by updating the jobmanager command >> args from >> >> args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional >> arguments>, <job arguments>] >> >> to >> >> args: ["standalone-job", "--python", "my_python_app.py", <optional >> arguments>, <job arguments>] >> >> But this didn't work. It resulted in the following error: >> >> Caused by: java.lang.LinkageError: loader constraint violation: loader >> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class >> org.apache.commons.cli.Options. A different class with the same name was >> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed >> module of loader 'app' >> >> I was able to get things to 'run' by setting args to: >> >> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] >> >> >> But I'm not sure if things were running in a distributed fashion or not. >> >> 1/ Is there a good way to check if the task pods were being correctly >> utilized? >> >> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on >> kubernetes? >> >> Open to any suggestions you may have. Note: we'd prefer not to run using >> the native K8S route outlined at [1] because we need to maintain the >> ability to customize certain aspects of the deployment (eg. mounting SSDs >> to some of the pods) >> >> Thanks in advance! >> >> [0] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode >> >>