Hi till,
Thanks for your reply. I agree with you that both option 1 and 3 need to be supported. Option 1 is reactive mode of resource management and flink is not aware of underlying cluster. If a user has limited resources to run flink jobs, this option will be very useful. On the other side, option 3 is active mode resource management. Compared with option 1, the biggest advantage is that we could allocate resource from k8s cluster on demand. Especially batch jobs will benefit a lot from this. I do not mean to abandon the proposal and Implementation in FLINK-9953. Actually i have contacted with the assignee(Chunhui Shi) to help to review and test the PRs. After all the basic Implementations have been merged, the production features will be considered. Best, Yang Till Rohrmann <trohrm...@apache.org> 于2019年8月13日周二 下午5:36写道: > Hi Yang, > > thanks for reviving the discussion about Flink's Kubernetes integration. In > a nutshell, I think that Flink should support option 1) and 3). Concretely, > option 1) would be covered by the reactive mode [1] which is not > necessarily bound to Kubernetes and works in all environments equally well. > Option 3) is the native Kubernetes integration which is described in the > design document. Actually, the discussion had been concluded already some > time ago and there are already multiple PRs open for adding this feature > [2]. So maybe you could check these PRs out and help the community > reviewing and merging this code. Based on this we could then think about > additions/improvements which are necessary. > > For option 2), I think a Kubernetes operator would be a good project for > Flink's ecosystem website [3] and does not need to be necessarily part of > Flink's repository. > > [1] https://issues.apache.org/jira/browse/FLINK-10407 > [2] https://issues.apache.org/jira/browse/FLINK-9953 > [3] > > https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E > > Cheers, > Till > > On Mon, Aug 12, 2019 at 5:46 AM Yang Wang <danrtsey...@gmail.com> wrote: > > > Hi kaibo, > > > > > > I am really appreciated that you could share your use case. > > > > As you say, our users in production also could be divided into two > groups. > > The common users have more knowledge about flink, they could use the > > command line to submit job and debug job from logs of job manager and > > taskmanager in the kubenetes. And for platform users, they use the yaml > > config files or platform web to submit flink jobs. > > > > Regarding your comments: > > > > 1. Of course, the option 1(standalone on k8s) should always work as > > expected. Users could submit the jm/tm/svc resource files to start a > flink > > cluster. The option 3(k8s native integration) will support both resource > > files and command line submission. The resource file below is to create a > > flink perjob cluster. > > > > apiVersion: extensions/v1beta1 > > > > kind: Deployment > > > > metadata: > > > > name: flink-word-count > > > > spec: > > > > image: flink-wordcount:latest > > > > flinkConfig: > > > > state.checkpoints.dir: > > file:///checkpoints/flink/externalized-checkpoints > > > > jobManagerConfig: > > > > resources: > > > > requests: > > > > memory: “1024Mi" > > > > cpu: “1” > > > > taskManagerConfig: > > > > taskSlots: 2 > > > > resources: > > > > requests: > > > > memory: “1024Mi" > > > > cpu: “1” > > > > jobId: “aaaabbbbccccddddaaaabbbbccccdddd” > > > > parallelism: 3 > > > > jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount" > > > > 2. The ability to pass job-classname will be retained. The class should > be > > found in the classpath of taskmanager image. The flink per-job cluster > > describe by yaml resource in section 1 could also be submitted by flink > > command. > > > > flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki > > flink-wordcount:latest -kjm 1024 -ktm 1024 -kD > kubernetes.jobmanager.cpu=1 > > -kD kubernetes.taskmanager.cpu=1 -kjid aaaabbbbccccddddaaaabbbbccccdddd > > -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD > > state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints > > > > 3. The job-id could also be specified by -kjid just like the command > above. > > > > In a nutshell, the option 3 should have all the abilities in option 1. > > Common users and platform users are all satisfied. > > > > > > > > Best, > > > > Yang > > > > > > Kaibo Zhou <zkb...@gmail.com> 于2019年8月11日周日 下午1:23写道: > > > > > Thanks for bringing this up. Obviously, option 2 and 3 are both useful > > for > > > fink users on kubernetes. But option 3 is easy for users that not have > > many > > > concepts of kubernetes, they can start flink on kubernetes quickly, I > > think > > > it should have a higher priority. > > > > > > I have worked some time to integrate flink with our platform based on > > > kubernetes, and have some concerns on option 3 from the platform user's > > > perspective. > > > > > > First, I think users can be divided into common users and downstream > > > platform users. > > > > > > For common users, kubernetes-session.sh (or yarn-session.sh) is > > convenient > > > for them, just run shell scripts and get the jobmanager address. Then > run > > > ./bin/flink to submit a job. > > > > > > But for the platform users, the shell scripts are not friendly to be > > > integrated. I need to use Java ProcessBuilder to run a shell script and > > > redirect the stdout/stderr. I need to parse the stdout log to get the > > > jobId, and need to process the exit code, and need to do some > idempotence > > > logic to avoid duplicate jobs to be submitted. > > > > > > The way our platform integrates with flink on k8s is: > > > 1. Generate a job Id, and prepare > > jobmanager/taskmanager/service/configmap > > > resource files. > > > In the jobmanager and taskmanager resource file, we defined an > > > initContainer to download user jar from http/hdfs/s3..., so the user > jar > > is > > > already on the jm and tm pod before they start. And > > > StandaloneJobClusterEntryPoint can accept "--job-id" to pass > > pre-generated > > > jobId and accept "--job-classname" to pass user jar entry class and > other > > > args[1]. > > > > > > 2. Submit resource files to k8s directly, and that is all. Not need > other > > > steps, e.g. upload/submit jar to flink, and k8s guarantee the > idempotence > > > natural, the same resources will be ignored. > > > > > > 3. Just use the pre-configured job id to query status, the platform > knows > > > the job id. > > > > > > The above steps are convenient for platform users. So my concern for > > option > > > 3 is: > > > 1. Besides to use kubernetes-session.sh to submit a job, can we retain > > the > > > ability to let users submit k8s resources files directly, not forced to > > > submit jobs from shell scripts. As you know, everything in kubernetes > is > > a > > > resource, submit a resource to kubernetes is more natural. > > > > > > 2. Retain the ability to pass job-classname to start Flink Job Cluster, > > so > > > the platform users do not need a step to submit jar whether from > > > ./bin/flink or from restful API. > > > And for Flink Session Cluster, the platform uses can submit kubernetes > > > resource files to start a session cluster, and then submit jar job from > > > restful API to avoid call the shell scripts. > > > > > > 3. Retain the ability to pass job-id, It is not convenient and friendly > > to > > > find which job id you have just submitted whether parse the submit log > or > > > query jobmanager restful API. And it is impossible to find the jobId in > > the > > > session cluster scene, there will be many jobs with the same name and > > same > > > submit time. > > > > > > I think it's better to retain these features already provided by the > > > StandaloneJobClusterEntryPoint in option 3. This will make flink easier > > to > > > be integrated with other platforms based on kubernetes. > > > > > > Thanks > > > Kaibo > > > > > > [1]. > > > > > > > > > https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 > > > > > > Jeff Zhang <zjf...@gmail.com> 于2019年8月10日周六 下午1:52写道: > > > > > > > Thanks Yang. K8s natively integration is very necessary and important > > for > > > > the adoption of flink IMO. > > > > I notice that the design doc is written in 2018, is there any changes > > or > > > > update ? > > > > > > > > >>> Download the flink release binary and create the ~/.kube/config > > file > > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > How can I specify which k8s cluster to run in case I have multiple > k8s > > > > clusters ? Can I do it via specifying flink cluster in flink cli ? > > > > > > > > Yang Wang <danrtsey...@gmail.com> 于2019年8月9日周五 下午9:12写道: > > > > > > > > > Hi all, > > > > > > > > > > Currently cloud native architectures has been introduced to many > > > > companies > > > > > in production. They use kubernetes to run deep learning, web > server, > > > etc. > > > > > If we could deploy the per-job/session flink cluster on kubernetes > to > > > > make > > > > > it mix-run with other workloads, the cluster resource utilization > > will > > > be > > > > > better. Also many kubernetes users are more easier to have a taste > on > > > the > > > > > flink. > > > > > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > > > standalone cluster on k8s. Use flink run to submit job to the > existed > > > > flink > > > > > cluster. Some companies may have their own deploy system to manage > > the > > > > > flink cluster. > > > > > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > > > including > > > > > session and perjob. It could manage the complete deployment > lifecycle > > > of > > > > > the application. I think this option is really easy to use for the > > k8s > > > > > users. They are familiar with k8s-opertor, kubectl and other tools > of > > > > k8s. > > > > > They could debug and run the flink cluster just like other k8s > > > > > applications. > > > > > > > > > > [3]. Natively integration with k8s, use the flink run or > > > > > kubernetes-session.sh to start a flink cluster. It is very similar > to > > > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor > > talks > > > to > > > > > k8s api server to start a flink master deployment of 1. > > > > > KubernetesResourceManager dynamically allocates resource from k8s > to > > > > start > > > > > task manager as demand. This option is very easy for flink users to > > get > > > > > started. In the simplest case, we just need to update the '-m > > > > yarn-cluster' > > > > > to -m '-m kubernetes-cluster'. > > > > > > > > > > We have make an internal implementation of option [3] and use it in > > > > > production. After fully tested, we hope to contribute it to the > > > > community. > > > > > Now we want to get some feedbacks about the three options. Any > > comments > > > > are > > > > > welcome. > > > > > > > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s > using > > > > native > > > > > integration? > > > > > > > > > > Download the flink release binary and create the ~/.kube/config > file > > > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > > > > > > > Flink Session cluster > > > > > > > > > > * start a session cluster > > > > > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > > > flink-session-example > > > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > > > option > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-session-example > > > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > > > > > > > Flink Job Cluster > > > > > > > > > > * running with official flink image > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-perjob-example-1 > > > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > > > > > * running with user image > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-perjob-example-1 > > > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > > > > > [3]. > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > > > > > > > > > > > > -- > > > > Best Regards > > > > > > > > Jeff Zhang > > > > > > > > > >