Hi Yang,

thanks for reviving the discussion about Flink's Kubernetes integration. In
a nutshell, I think that Flink should support option 1) and 3). Concretely,
option 1) would be covered by the reactive mode [1] which is not
necessarily bound to Kubernetes and works in all environments equally well.
Option 3) is the native Kubernetes integration which is described in the
design document. Actually, the discussion had been concluded already some
time ago and there are already multiple PRs open for adding this feature
[2]. So maybe you could check these PRs out and help the community
reviewing and merging this code. Based on this we could then think about
additions/improvements which are necessary.

For option 2), I think a Kubernetes operator would be a good project for
Flink's ecosystem website [3] and does not need to be necessarily part of
Flink's repository.

[1] https://issues.apache.org/jira/browse/FLINK-10407
[2] https://issues.apache.org/jira/browse/FLINK-9953
[3]
https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Mon, Aug 12, 2019 at 5:46 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Hi kaibo,
>
>
> I am really appreciated that you could share your use case.
>
> As you say, our users in production also could be divided into two groups.
> The common users have more knowledge about flink, they could use the
> command line to submit job and debug job from logs of job manager and
> taskmanager in the kubenetes. And for platform users, they use the yaml
> config files or platform web to submit flink jobs.
>
> Regarding your comments:
>
> 1. Of course, the option 1(standalone on k8s) should always work as
> expected. Users could submit the jm/tm/svc resource files to start a flink
> cluster. The option 3(k8s native integration) will support both resource
> files and command line submission. The resource file below is to create a
> flink perjob cluster.
>
> apiVersion: extensions/v1beta1
>
> kind: Deployment
>
> metadata:
>
>   name: flink-word-count
>
> spec:
>
>   image: flink-wordcount:latest
>
>   flinkConfig:
>
>     state.checkpoints.dir:
> file:///checkpoints/flink/externalized-checkpoints
>
>   jobManagerConfig:
>
>     resources:
>
>       requests:
>
>         memory: “1024Mi"
>
>         cpu: “1”
>
>   taskManagerConfig:
>
>     taskSlots: 2
>
>     resources:
>
>       requests:
>
>         memory: “1024Mi"
>
>         cpu: “1”
>
>   jobId: “aaaabbbbccccddddaaaabbbbccccdddd”
>
>   parallelism: 3
>
>   jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount"
>
> 2. The ability to pass job-classname will be retained. The class should be
> found in the classpath of taskmanager image. The flink per-job cluster
> describe by yaml resource in section 1 could also be submitted by flink
> command.
>
> flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki
> flink-wordcount:latest -kjm 1024 -ktm 1024 -kD kubernetes.jobmanager.cpu=1
> -kD kubernetes.taskmanager.cpu=1 -kjid aaaabbbbccccddddaaaabbbbccccdddd
> -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD
> state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints
>
> 3. The job-id could also be specified by -kjid just like the command above.
>
> In a nutshell, the option 3 should have all the abilities in option 1.
> Common users and platform users are all satisfied.
>
>
>
> Best,
>
> Yang
>
>
> Kaibo Zhou <zkb...@gmail.com> 于2019年8月11日周日 下午1:23写道:
>
> > Thanks for bringing this up. Obviously, option 2 and 3 are both useful
> for
> > fink users on kubernetes. But option 3 is easy for users that not have
> many
> > concepts of kubernetes, they can start flink on kubernetes quickly, I
> think
> > it should have a higher priority.
> >
> > I have worked some time to integrate flink with our platform based on
> > kubernetes, and have some concerns on option 3 from the platform user's
> > perspective.
> >
> > First, I think users can be divided into common users and downstream
> > platform users.
> >
> > For common users, kubernetes-session.sh (or yarn-session.sh) is
> convenient
> > for them, just run shell scripts and get the jobmanager address. Then run
> > ./bin/flink to submit a job.
> >
> > But for the platform users, the shell scripts are not friendly to be
> > integrated. I need to use Java ProcessBuilder to run a shell script and
> > redirect the stdout/stderr. I need to parse the stdout log to get the
> > jobId, and need to process the exit code, and need to do some idempotence
> > logic to avoid duplicate jobs to be submitted.
> >
> > The way our platform integrates with flink on k8s is:
> > 1. Generate a job Id, and prepare
> jobmanager/taskmanager/service/configmap
> > resource files.
> > In the jobmanager and taskmanager resource file, we defined an
> > initContainer to download user jar from http/hdfs/s3..., so the user jar
> is
> > already on the jm and tm pod before they start. And
> > StandaloneJobClusterEntryPoint can accept "--job-id" to pass
> pre-generated
> > jobId and accept "--job-classname" to pass user jar entry class and other
> > args[1].
> >
> > 2. Submit resource files to k8s directly, and that is all. Not need other
> > steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence
> > natural, the same resources will be ignored.
> >
> > 3. Just use the pre-configured job id to query status, the platform knows
> > the job id.
> >
> > The above steps are convenient for platform users. So my concern for
> option
> > 3 is:
> > 1. Besides to use kubernetes-session.sh to submit a job, can we retain
> the
> > ability to let users submit k8s resources files directly, not forced to
> > submit jobs from shell scripts. As you know, everything in kubernetes is
> a
> > resource, submit a resource to kubernetes is more natural.
> >
> > 2. Retain the ability to pass job-classname to start Flink Job Cluster,
> so
> > the platform users do not need a step to submit jar whether from
> > ./bin/flink or from restful API.
> > And for Flink Session Cluster, the platform uses can submit kubernetes
> > resource files to start a session cluster, and then submit jar job from
> > restful API to avoid call the shell scripts.
> >
> > 3. Retain the ability to pass job-id, It is not convenient and friendly
> to
> > find which job id you have just submitted whether parse the submit log or
> > query jobmanager restful API. And it is impossible to find the jobId in
> the
> > session cluster scene, there will be many jobs with the same name and
> same
> > submit time.
> >
> > I think it's better to retain these features already provided by the
> > StandaloneJobClusterEntryPoint in option 3. This will make flink easier
> to
> > be integrated with other platforms based on kubernetes.
> >
> > Thanks
> > Kaibo
> >
> > [1].
> >
> >
> https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45
> >
> > Jeff Zhang <zjf...@gmail.com> 于2019年8月10日周六 下午1:52写道:
> >
> > > Thanks Yang. K8s natively integration is very necessary and important
> for
> > > the adoption of flink IMO.
> > > I notice that the design doc is written in 2018, is there any changes
> or
> > > update ?
> > >
> > > >>> Download the flink release binary and create the ~/.kube/config
> file
> > > corresponding to the k8s cluster. It is all what you need.
> > >
> > > How can I specify which k8s cluster to run in case I have multiple k8s
> > > clusters ? Can I do it via specifying flink cluster in flink cli ?
> > >
> > > Yang Wang <danrtsey...@gmail.com> 于2019年8月9日周五 下午9:12写道:
> > >
> > > > Hi all,
> > > >
> > > > Currently cloud native architectures has been introduced to many
> > > companies
> > > > in production. They use kubernetes to run deep learning, web server,
> > etc.
> > > > If we could deploy the per-job/session flink cluster on kubernetes to
> > > make
> > > > it mix-run with other workloads, the cluster resource utilization
> will
> > be
> > > > better. Also many kubernetes users are more easier to have a taste on
> > the
> > > > flink.
> > > >
> > > > By now we have three options to run flink jobs on k8s.
> > > >
> > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > > > standalone cluster on k8s. Use flink run to submit job to the existed
> > > flink
> > > > cluster. Some companies may have their own deploy system to manage
> the
> > > > flink cluster.
> > > >
> > > > [2]. Use flink-k8s-operator to manage multiple flink clusters,
> > including
> > > > session and perjob. It could manage the complete deployment lifecycle
> > of
> > > > the application. I think this option is really easy to use for the
> k8s
> > > > users. They are familiar with k8s-opertor, kubectl and other tools of
> > > k8s.
> > > > They could debug and run the flink cluster just like other k8s
> > > > applications.
> > > >
> > > > [3]. Natively integration with k8s, use the flink run or
> > > > kubernetes-session.sh to start a flink cluster. It is very similar to
> > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor
> talks
> > to
> > > > k8s api server to start a flink master deployment of 1.
> > > > KubernetesResourceManager dynamically allocates resource from k8s to
> > > start
> > > > task manager as demand. This option is very easy for flink users to
> get
> > > > started. In the simplest case, we just need to update the '-m
> > > yarn-cluster'
> > > > to -m '-m kubernetes-cluster'.
> > > >
> > > > We have make an internal implementation of option [3] and use it in
> > > > production. After fully tested, we hope to contribute it to the
> > > community.
> > > > Now we want to get some feedbacks about the three options. Any
> comments
> > > are
> > > > welcome.
> > > >
> > > >
> > > > > What do we need to prepare when start a flink cluster on k8s using
> > > native
> > > > integration?
> > > >
> > > > Download the flink release binary and create the ~/.kube/config file
> > > > corresponding to the k8s cluster. It is all what you need.
> > > >
> > > >
> > > > > Flink Session cluster
> > > >
> > > > * start a session cluster
> > > >
> > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> > > flink-session-example
> > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> > > >
> > > > *  You will get an address to submit job, specify it through ’-ksa’
> > > option
> > > >
> > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm
> > flink-session-example
> > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
> > > >
> > > >
> > > > > Flink Job Cluster
> > > >
> > > > * running with official flink image
> > > >
> > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm
> > flink-perjob-example-1
> > > > -ki flink:latest examples/streaming/WindowJoin.jar
> > > >
> > > > * running with user image
> > > >
> > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm
> > flink-perjob-example-1
> > > > -ki flink-user:latest examples/streaming/WindowJoin.jar
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
> > > >
> > > > [2].https://github.com/lyft/flinkk8soperator
> > > >
> > > > [3].
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit#
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>

Reply via email to