Thanks for bringing this up. Obviously, option 2 and 3 are both useful for
fink users on kubernetes. But option 3 is easy for users that not have many
concepts of kubernetes, they can start flink on kubernetes quickly, I think
it should have a higher priority.

I have worked some time to integrate flink with our platform based on
kubernetes, and have some concerns on option 3 from the platform user's
perspective.

First, I think users can be divided into common users and downstream
platform users.

For common users, kubernetes-session.sh (or yarn-session.sh) is convenient
for them, just run shell scripts and get the jobmanager address. Then run
./bin/flink to submit a job.

But for the platform users, the shell scripts are not friendly to be
integrated. I need to use Java ProcessBuilder to run a shell script and
redirect the stdout/stderr. I need to parse the stdout log to get the
jobId, and need to process the exit code, and need to do some idempotence
logic to avoid duplicate jobs to be submitted.

The way our platform integrates with flink on k8s is:
1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap
resource files.
In the jobmanager and taskmanager resource file, we defined an
initContainer to download user jar from http/hdfs/s3..., so the user jar is
already on the jm and tm pod before they start. And
StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated
jobId and accept "--job-classname" to pass user jar entry class and other
args[1].

2. Submit resource files to k8s directly, and that is all. Not need other
steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence
natural, the same resources will be ignored.

3. Just use the pre-configured job id to query status, the platform knows
the job id.

The above steps are convenient for platform users. So my concern for option
3 is:
1. Besides to use kubernetes-session.sh to submit a job, can we retain the
ability to let users submit k8s resources files directly, not forced to
submit jobs from shell scripts. As you know, everything in kubernetes is a
resource, submit a resource to kubernetes is more natural.

2. Retain the ability to pass job-classname to start Flink Job Cluster, so
the platform users do not need a step to submit jar whether from
./bin/flink or from restful API.
And for Flink Session Cluster, the platform uses can submit kubernetes
resource files to start a session cluster, and then submit jar job from
restful API to avoid call the shell scripts.

3. Retain the ability to pass job-id, It is not convenient and friendly to
find which job id you have just submitted whether parse the submit log or
query jobmanager restful API. And it is impossible to find the jobId in the
session cluster scene, there will be many jobs with the same name and same
submit time.

I think it's better to retain these features already provided by the
StandaloneJobClusterEntryPoint in option 3. This will make flink easier to
be integrated with other platforms based on kubernetes.

Thanks
Kaibo

[1].
https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45

Jeff Zhang <zjf...@gmail.com> 于2019年8月10日周六 下午1:52写道:

> Thanks Yang. K8s natively integration is very necessary and important for
> the adoption of flink IMO.
> I notice that the design doc is written in 2018, is there any changes or
> update ?
>
> >>> Download the flink release binary and create the ~/.kube/config file
> corresponding to the k8s cluster. It is all what you need.
>
> How can I specify which k8s cluster to run in case I have multiple k8s
> clusters ? Can I do it via specifying flink cluster in flink cli ?
>
> Yang Wang <danrtsey...@gmail.com> 于2019年8月9日周五 下午9:12写道:
>
> > Hi all,
> >
> > Currently cloud native architectures has been introduced to many
> companies
> > in production. They use kubernetes to run deep learning, web server, etc.
> > If we could deploy the per-job/session flink cluster on kubernetes to
> make
> > it mix-run with other workloads, the cluster resource utilization will be
> > better. Also many kubernetes users are more easier to have a taste on the
> > flink.
> >
> > By now we have three options to run flink jobs on k8s.
> >
> > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > standalone cluster on k8s. Use flink run to submit job to the existed
> flink
> > cluster. Some companies may have their own deploy system to manage the
> > flink cluster.
> >
> > [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> > session and perjob. It could manage the complete deployment lifecycle of
> > the application. I think this option is really easy to use for the k8s
> > users. They are familiar with k8s-opertor, kubectl and other tools of
> k8s.
> > They could debug and run the flink cluster just like other k8s
> > applications.
> >
> > [3]. Natively integration with k8s, use the flink run or
> > kubernetes-session.sh to start a flink cluster. It is very similar to
> > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to
> > k8s api server to start a flink master deployment of 1.
> > KubernetesResourceManager dynamically allocates resource from k8s to
> start
> > task manager as demand. This option is very easy for flink users to get
> > started. In the simplest case, we just need to update the '-m
> yarn-cluster'
> > to -m '-m kubernetes-cluster'.
> >
> > We have make an internal implementation of option [3] and use it in
> > production. After fully tested, we hope to contribute it to the
> community.
> > Now we want to get some feedbacks about the three options. Any comments
> are
> > welcome.
> >
> >
> > > What do we need to prepare when start a flink cluster on k8s using
> native
> > integration?
> >
> > Download the flink release binary and create the ~/.kube/config file
> > corresponding to the k8s cluster. It is all what you need.
> >
> >
> > > Flink Session cluster
> >
> > * start a session cluster
> >
> > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> flink-session-example
> > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > *  You will get an address to submit job, specify it through ’-ksa’
> option
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example
> > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
> >
> >
> > > Flink Job Cluster
> >
> > * running with official flink image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink:latest examples/streaming/WindowJoin.jar
> >
> > * running with user image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink-user:latest examples/streaming/WindowJoin.jar
> >
> >
> >
> > [1].
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
> >
> > [2].https://github.com/lyft/flinkk8soperator
> >
> > [3].
> >
> >
> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit#
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to