Hi Kevin,

For your information, bellow is an example for running a PyFlink table API
WordCount job.

1. Building a Docker image with Python and PyFlink Installed:

Dockerfile:

FROM flink:1.12.0


# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install Python Flink

RUN pip3 install apache-flink==1.12.0

2. Resource definitions:

Flink-configuration-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors
on
    # log level INFO. The root logger does not override this. You have to
manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p
%-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel
handler
    logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

Job-manager-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Job-manager.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: pyflink:v1
          env:
          args: ["standalone-job", "-py",
"/opt/flink/examples/python/table/batch/word_count.py"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink
image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

Task-manager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: pyflink:v1
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink
image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

3. Creating resources:

$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create
-f jobmanager-service.yaml# Create the deployments for the cluster$
kubectl create -f job-manager.yaml$ kubectl create -f
task-manager.yaml

Best,
Shuiqiang

Shuiqiang Chen <acqua....@gmail.com> 于2021年3月6日周六 下午5:10写道:

> Hi Kevin,
>
> You are able to run PyFlink applications on kuberetes cluster, both native
> k8s mode and resource definition mode are supported since release-1.12.0.
> Currently, Python and PyFlink are not enabled in official flink docker
> image, that you might need to build a custom image with Python and PyFlink
> install, please refer to Enbale Python in docker
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
> .
>
> Generally, by setting the value of args field in
> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
> "my_python_app.py", <optional arguments>, <job arguments>] the job
> manager will try to submit a PyFlink job with the specified python file
> once it is started. You can check the pod status for jobmanger and
> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
> turn to the completed state once the job is finished or error state if
> there is something wrong, while the task manger pod will always be in the
> running state.
>
> Finally, it requires you to tear down the cluster by deleting all created
> resources (jobmanger/taskmanger jobs, flink-conf configmap,
> jobmanger-service, etc).
>
> Best,
> Shuiqiang
>
>
>
> Kevin Lam <kevin....@shopify.com> 于2021年3月6日周六 上午5:29写道:
>
>> Hello everyone,
>>
>> I'm looking to run a Pyflink application run in a distributed fashion,
>> using kubernetes, and am currently facing issues. I've successfully gotten
>> a Scala Flink Application to run using the manifests provided at [0]
>>
>> I attempted to run the application by updating the jobmanager command
>> args from
>>
>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional 
>> arguments>, <job arguments>]
>>
>> to
>>
>> args: ["standalone-job", "--python", "my_python_app.py", <optional 
>> arguments>, <job arguments>]
>>
>> But this didn't work. It resulted in the following error:
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>> org.apache.commons.cli.Options. A different class with the same name was
>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>> module of loader 'app'
>>
>> I was able to get things to 'run' by setting args to:
>>
>> args: ["python", "my_python_app.py", <optional arguments>, <job arguments>]
>>
>>
>> But I'm not sure if things were running in a distributed fashion or not.
>>
>> 1/ Is there a good way to check if the task pods were being correctly
>> utilized?
>>
>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>> kubernetes?
>>
>> Open to any suggestions you may have. Note: we'd prefer not to run using
>> the native K8S route outlined at [1] because we need to maintain the
>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>> to some of the pods)
>>
>> Thanks in advance!
>>
>> [0]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>
>>

Reply via email to