Awesome, thanks Shuiqiang! I was able to get an example running by referencing your configs.
On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <acqua....@gmail.com> wrote: > Hi Kevin, > > For your information, bellow is an example for running a PyFlink table API > WordCount job. > > 1. Building a Docker image with Python and PyFlink Installed: > > Dockerfile: > > FROM flink:1.12.0 > > > # install python3 and pip3 > RUN apt-get update -y && \ > apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf > /var/lib/apt/lists/* > RUN ln -s /usr/bin/python3 /usr/bin/python > > # install Python Flink > > RUN pip3 install apache-flink==1.12.0 > > 2. Resource definitions: > > Flink-configuration-configmap.yaml: > > apiVersion: v1 > kind: ConfigMap > metadata: > name: flink-config > labels: > app: flink > data: > flink-conf.yaml: |+ > jobmanager.rpc.address: flink-jobmanager > taskmanager.numberOfTaskSlots: 2 > blob.server.port: 6124 > jobmanager.rpc.port: 6123 > taskmanager.rpc.port: 6122 > queryable-state.proxy.ports: 6125 > jobmanager.memory.process.size: 1600m > taskmanager.memory.process.size: 1728m > parallelism.default: 2 > log4j-console.properties: |+ > # This affects logging for both user code and Flink > rootLogger.level = INFO > rootLogger.appenderRef.console.ref = ConsoleAppender > rootLogger.appenderRef.rolling.ref = RollingFileAppender > > # Uncomment this if you want to _only_ change Flink's logging > #logger.flink.name = org.apache.flink > #logger.flink.level = INFO > > # The following lines keep the log level of common > libraries/connectors on > # log level INFO. The root logger does not override this. You have to > manually > # change the log levels here. > logger.akka.name = akka > logger.akka.level = INFO > logger.kafka.name= org.apache.kafka > logger.kafka.level = INFO > logger.hadoop.name = org.apache.hadoop > logger.hadoop.level = INFO > logger.zookeeper.name = org.apache.zookeeper > logger.zookeeper.level = INFO > > # Log all infos to the console > appender.console.name = ConsoleAppender > appender.console.type = CONSOLE > appender.console.layout.type = PatternLayout > appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p > %-60c %x - %m%n > > # Log all infos in the given rolling file > appender.rolling.name = RollingFileAppender > appender.rolling.type = RollingFile > appender.rolling.append = false > appender.rolling.fileName = ${sys:log.file} > appender.rolling.filePattern = ${sys:log.file}.%i > appender.rolling.layout.type = PatternLayout > appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p > %-60c %x - %m%n > appender.rolling.policies.type = Policies > appender.rolling.policies.size.type = SizeBasedTriggeringPolicy > appender.rolling.policies.size.size=100MB > appender.rolling.strategy.type = DefaultRolloverStrategy > appender.rolling.strategy.max = 10 > > # Suppress the irrelevant (wrong) warnings from the Netty channel > handler > logger.netty.name = > org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline > logger.netty.level = OFF > > Job-manager-service.yaml: > > apiVersion: v1 > kind: Service > metadata: > name: flink-jobmanager > spec: > type: ClusterIP > ports: > - name: rpc > port: 6123 > - name: blob-server > port: 6124 > - name: webui > port: 8081 > selector: > app: flink > component: jobmanager > > Job-manager.yaml > > apiVersion: batch/v1 > kind: Job > metadata: > name: flink-jobmanager > spec: > template: > metadata: > labels: > app: flink > component: jobmanager > spec: > restartPolicy: OnFailure > containers: > - name: jobmanager > image: pyflink:v1 > env: > args: ["standalone-job", "-py", > "/opt/flink/examples/python/table/batch/word_count.py"] > ports: > - containerPort: 6123 > name: rpc > - containerPort: 6124 > name: blob-server > - containerPort: 8081 > name: webui > livenessProbe: > tcpSocket: > port: 6123 > initialDelaySeconds: 30 > periodSeconds: 60 > volumeMounts: > - name: flink-config-volume > mountPath: /opt/flink/conf > securityContext: > runAsUser: 9999 # refers to user _flink_ from official flink > image, change if necessary > volumes: > - name: flink-config-volume > configMap: > name: flink-config > items: > - key: flink-conf.yaml > path: flink-conf.yaml > - key: log4j-console.properties > path: log4j-console.properties > > Task-manager.yaml > > apiVersion: apps/v1 > kind: Deployment > metadata: > name: flink-taskmanager > spec: > replicas: 2 > selector: > matchLabels: > app: flink > component: taskmanager > template: > metadata: > labels: > app: flink > component: taskmanager > spec: > containers: > - name: taskmanager > image: pyflink:v1 > env: > args: ["taskmanager"] > ports: > - containerPort: 6122 > name: rpc > - containerPort: 6125 > name: query-state > livenessProbe: > tcpSocket: > port: 6122 > initialDelaySeconds: 30 > periodSeconds: 60 > volumeMounts: > - name: flink-config-volume > mountPath: /opt/flink/conf/ > securityContext: > runAsUser: 9999 # refers to user _flink_ from official flink > image, change if necessary > volumes: > - name: flink-config-volume > configMap: > name: flink-config > items: > - key: flink-conf.yaml > path: flink-conf.yaml > - key: log4j-console.properties > path: log4j-console.properties > > 3. Creating resources: > > $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f > jobmanager-service.yaml# Create the deployments for the cluster$ kubectl > create -f job-manager.yaml$ kubectl create -f task-manager.yaml > > Best, > Shuiqiang > > Shuiqiang Chen <acqua....@gmail.com> 于2021年3月6日周六 下午5:10写道: > >> Hi Kevin, >> >> You are able to run PyFlink applications on kuberetes cluster, both >> native k8s mode and resource definition mode are supported since >> release-1.12.0. Currently, Python and PyFlink are not enabled in official >> flink docker image, that you might need to build a custom image with Python >> and PyFlink install, please refer to Enbale Python in docker >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python> >> . >> >> Generally, by setting the value of args field in >> `jobmanager-application.yaml` to be args: ["standalone-job", "--python", >> "my_python_app.py", <optional arguments>, <job arguments>] the job >> manager will try to submit a PyFlink job with the specified python file >> once it is started. You can check the pod status for jobmanger and >> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will >> turn to the completed state once the job is finished or error state if >> there is something wrong, while the task manger pod will always be in the >> running state. >> >> Finally, it requires you to tear down the cluster by deleting all created >> resources (jobmanger/taskmanger jobs, flink-conf configmap, >> jobmanger-service, etc). >> >> Best, >> Shuiqiang >> >> >> >> Kevin Lam <kevin....@shopify.com> 于2021年3月6日周六 上午5:29写道: >> >>> Hello everyone, >>> >>> I'm looking to run a Pyflink application run in a distributed fashion, >>> using kubernetes, and am currently facing issues. I've successfully gotten >>> a Scala Flink Application to run using the manifests provided at [0] >>> >>> I attempted to run the application by updating the jobmanager command >>> args from >>> >>> args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional >>> arguments>, <job arguments>] >>> >>> to >>> >>> args: ["standalone-job", "--python", "my_python_app.py", <optional >>> arguments>, <job arguments>] >>> >>> But this didn't work. It resulted in the following error: >>> >>> Caused by: java.lang.LinkageError: loader constraint violation: loader >>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class >>> org.apache.commons.cli.Options. A different class with the same name was >>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed >>> module of loader 'app' >>> >>> I was able to get things to 'run' by setting args to: >>> >>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>] >>> >>> >>> But I'm not sure if things were running in a distributed fashion or not. >>> >>> 1/ Is there a good way to check if the task pods were being correctly >>> utilized? >>> >>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on >>> kubernetes? >>> >>> Open to any suggestions you may have. Note: we'd prefer not to run using >>> the native K8S route outlined at [1] because we need to maintain the >>> ability to customize certain aspects of the deployment (eg. mounting SSDs >>> to some of the pods) >>> >>> Thanks in advance! >>> >>> [0] >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode >>> >>>