Awesome, thanks Shuiqiang! I was able to get an example running by
referencing your configs.

On Sat, Mar 6, 2021 at 7:12 AM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi Kevin,
>
> For your information, bellow is an example for running a PyFlink table API
> WordCount job.
>
> 1. Building a Docker image with Python and PyFlink Installed:
>
> Dockerfile:
>
> FROM flink:1.12.0
>
>
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
> # install Python Flink
>
> RUN pip3 install apache-flink==1.12.0
>
> 2. Resource definitions:
>
> Flink-configuration-configmap.yaml:
>
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
>     app: flink
> data:
>   flink-conf.yaml: |+
>     jobmanager.rpc.address: flink-jobmanager
>     taskmanager.numberOfTaskSlots: 2
>     blob.server.port: 6124
>     jobmanager.rpc.port: 6123
>     taskmanager.rpc.port: 6122
>     queryable-state.proxy.ports: 6125
>     jobmanager.memory.process.size: 1600m
>     taskmanager.memory.process.size: 1728m
>     parallelism.default: 2
>   log4j-console.properties: |+
>     # This affects logging for both user code and Flink
>     rootLogger.level = INFO
>     rootLogger.appenderRef.console.ref = ConsoleAppender
>     rootLogger.appenderRef.rolling.ref = RollingFileAppender
>
>     # Uncomment this if you want to _only_ change Flink's logging
>     #logger.flink.name = org.apache.flink
>     #logger.flink.level = INFO
>
>     # The following lines keep the log level of common
> libraries/connectors on
>     # log level INFO. The root logger does not override this. You have to
> manually
>     # change the log levels here.
>     logger.akka.name = akka
>     logger.akka.level = INFO
>     logger.kafka.name= org.apache.kafka
>     logger.kafka.level = INFO
>     logger.hadoop.name = org.apache.hadoop
>     logger.hadoop.level = INFO
>     logger.zookeeper.name = org.apache.zookeeper
>     logger.zookeeper.level = INFO
>
>     # Log all infos to the console
>     appender.console.name = ConsoleAppender
>     appender.console.type = CONSOLE
>     appender.console.layout.type = PatternLayout
>     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>
>     # Log all infos in the given rolling file
>     appender.rolling.name = RollingFileAppender
>     appender.rolling.type = RollingFile
>     appender.rolling.append = false
>     appender.rolling.fileName = ${sys:log.file}
>     appender.rolling.filePattern = ${sys:log.file}.%i
>     appender.rolling.layout.type = PatternLayout
>     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
>     appender.rolling.policies.type = Policies
>     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
>     appender.rolling.policies.size.size=100MB
>     appender.rolling.strategy.type = DefaultRolloverStrategy
>     appender.rolling.strategy.max = 10
>
>     # Suppress the irrelevant (wrong) warnings from the Netty channel
> handler
>     logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>     logger.netty.level = OFF
>
> Job-manager-service.yaml:
>
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
> spec:
>   type: ClusterIP
>   ports:
>   - name: rpc
>     port: 6123
>   - name: blob-server
>     port: 6124
>   - name: webui
>     port: 8081
>   selector:
>     app: flink
>     component: jobmanager
>
> Job-manager.yaml
>
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: flink-jobmanager
> spec:
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: jobmanager
>     spec:
>       restartPolicy: OnFailure
>       containers:
>         - name: jobmanager
>           image: pyflink:v1
>           env:
>           args: ["standalone-job", "-py",
> "/opt/flink/examples/python/table/batch/word_count.py"]
>           ports:
>             - containerPort: 6123
>               name: rpc
>             - containerPort: 6124
>               name: blob-server
>             - containerPort: 8081
>               name: webui
>           livenessProbe:
>             tcpSocket:
>               port: 6123
>             initialDelaySeconds: 30
>             periodSeconds: 60
>           volumeMounts:
>             - name: flink-config-volume
>               mountPath: /opt/flink/conf
>           securityContext:
>             runAsUser: 9999  # refers to user _flink_ from official flink
> image, change if necessary
>       volumes:
>         - name: flink-config-volume
>           configMap:
>             name: flink-config
>             items:
>               - key: flink-conf.yaml
>                 path: flink-conf.yaml
>               - key: log4j-console.properties
>                 path: log4j-console.properties
>
> Task-manager.yaml
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 2
>   selector:
>     matchLabels:
>       app: flink
>       component: taskmanager
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: taskmanager
>     spec:
>       containers:
>       - name: taskmanager
>         image: pyflink:v1
>         env:
>         args: ["taskmanager"]
>         ports:
>         - containerPort: 6122
>           name: rpc
>         - containerPort: 6125
>           name: query-state
>         livenessProbe:
>           tcpSocket:
>             port: 6122
>           initialDelaySeconds: 30
>           periodSeconds: 60
>         volumeMounts:
>         - name: flink-config-volume
>           mountPath: /opt/flink/conf/
>         securityContext:
>           runAsUser: 9999  # refers to user _flink_ from official flink
> image, change if necessary
>       volumes:
>       - name: flink-config-volume
>         configMap:
>           name: flink-config
>           items:
>           - key: flink-conf.yaml
>             path: flink-conf.yaml
>           - key: log4j-console.properties
>             path: log4j-console.properties
>
> 3. Creating resources:
>
> $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f 
> jobmanager-service.yaml# Create the deployments for the cluster$ kubectl 
> create -f job-manager.yaml$ kubectl create -f task-manager.yaml
>
> Best,
> Shuiqiang
>
> Shuiqiang Chen <acqua....@gmail.com> 于2021年3月6日周六 下午5:10写道:
>
>> Hi Kevin,
>>
>> You are able to run PyFlink applications on kuberetes cluster, both
>> native k8s mode and resource definition mode are supported since
>> release-1.12.0. Currently, Python and PyFlink are not enabled in official
>> flink docker image, that you might need to build a custom image with Python
>> and PyFlink install, please refer to Enbale Python in docker
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
>> .
>>
>> Generally, by setting the value of args field in
>> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
>> "my_python_app.py", <optional arguments>, <job arguments>] the job
>> manager will try to submit a PyFlink job with the specified python file
>> once it is started. You can check the pod status for jobmanger and
>> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
>> turn to the completed state once the job is finished or error state if
>> there is something wrong, while the task manger pod will always be in the
>> running state.
>>
>> Finally, it requires you to tear down the cluster by deleting all created
>> resources (jobmanger/taskmanger jobs, flink-conf configmap,
>> jobmanger-service, etc).
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>> Kevin Lam <kevin....@shopify.com> 于2021年3月6日周六 上午5:29写道:
>>
>>> Hello everyone,
>>>
>>> I'm looking to run a Pyflink application run in a distributed fashion,
>>> using kubernetes, and am currently facing issues. I've successfully gotten
>>> a Scala Flink Application to run using the manifests provided at [0]
>>>
>>> I attempted to run the application by updating the jobmanager command
>>> args from
>>>
>>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional 
>>> arguments>, <job arguments>]
>>>
>>> to
>>>
>>> args: ["standalone-job", "--python", "my_python_app.py", <optional 
>>> arguments>, <job arguments>]
>>>
>>> But this didn't work. It resulted in the following error:
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>>> org.apache.commons.cli.Options. A different class with the same name was
>>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>>> module of loader 'app'
>>>
>>> I was able to get things to 'run' by setting args to:
>>>
>>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>>
>>>
>>> But I'm not sure if things were running in a distributed fashion or not.
>>>
>>> 1/ Is there a good way to check if the task pods were being correctly
>>> utilized?
>>>
>>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>>> kubernetes?
>>>
>>> Open to any suggestions you may have. Note: we'd prefer not to run using
>>> the native K8S route outlined at [1] because we need to maintain the
>>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>>> to some of the pods)
>>>
>>> Thanks in advance!
>>>
>>> [0]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>>
>>>

Reply via email to