Hi Thomas, Yes, I was referring to a separate repository under Apache Flink.
Cheers, Konstantin On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise <t...@apache.org> wrote: > Hi everyone, > > Thanks for the feedback and discussion. A few additional thoughts: > > [Konstantin] > With respect to common lifecycle management operations: > these features are > > not available (within Apache Flink) for any of the other resource > providers > > (YARN, Standalone) either. From this perspective, I wouldn't consider > this > > a shortcoming of the Kubernetes integration. > > I think time and evolution of the ecosystem are factors to consider as > well. The state and usage of Flink was much different when YARN > integration was novel. Expectations are different today and the > lifecycle functionality provided by an operator may as well be > considered essential to support the concept of a Flink application on > k8s. After few years learning from operator experience outside of > Flink it might be a good time to fill the gap. > > [Konstantin] > I still believe that we should keep this focus on low > > level composable building blocks (like Jobs and Snapshots) in Apache > Flink > > to make it easy for everyone to build fitting higher level abstractions > > like a FlinkApplication Custom Resource on top of it. > > I completely agree that it is important that the basic functions of > Flink are solid and continued focus is necessary. Thanks for sharing > the pointers, these are great improvements. At the same time, > ecosystem, contributor base and user spectrum are growing. There have > been significant additions in many areas of Flink including connectors > and higher level abstractions like statefun, SQL and Python. It's also > evident from additional repositories/subprojects that we have in Flink > today. > > [Konstantin] > Having said this, if others in the community have the > capacity to push and > > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache > > Flink, I don't see any blockers. If or when this happens, I'd see some > > clear benefits of using a separate repository (easier independent > > versioning and releases, different build system & tooling (go, I > assume)). > > Naturally different contributors to the project have different focus. > Let's find out if there is strong enough interest to take this on and > strong enough commitment to maintain. As I see it, there is a > tremendous amount of internal investment going into operationalizing > Flink within many companies. Improvements to the operational side of > Flink like the operator would complement Flink nicely. I assume that > you are referring to a separate repository within Apache Flink, which > would give it the chance to achieve better sustainability than the > existing external operator efforts. There is also the fact that some > organizations which are heavily invested in operationalizing Flink are > allowing contributing to Apache Flink itself but less so to arbitrary > github projects. Regarding the tooling, it could well turn out that > Java is a good alternative given the ecosystem focus and that there is > an opportunity for reuse in certain aspects (metrics, logging etc.). > > [Yang] > I think Xintong has given a strong point why we introduced > the native K8s integration, which is active resource management. > > I have a concrete example for this in the production. When a K8s node is > down, the standalone K8s deployment will take longer > > recovery time based on the K8s eviction time(IIRC, default is 5 > minutes). For the native K8s integration, Flink RM could be aware of the > > TM heartbeat lost and allocate a new one timely. > > Thanks for sharing this, we should evaluate it as part of a proposal. > If we can optimize recovery or scaling with active resource management > then perhaps it is worth to support it through the operator. > Previously mentioned operators all rely on the standalone model. > > Cheers, > Thomas > > On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf <kna...@apache.org> > wrote: > > > > cc dev@ > > > > Hi Thomas, Hi everyone, > > > > Thank you for starting this discussion and sorry for chiming in late. > > > > I agree with Thomas' and David's assessment of Flink's "Native Kubernetes > > Integration", in particular, it does actually not integrate well with the > > Kubernetes ecosystem despite being called "native" (tooling, security > > concerns). > > > > With respect to common lifecycle management operations: these features > are > > not available (within Apache Flink) for any of the other resource > providers > > (YARN, Standalone) either. From this perspective, I wouldn't consider > this > > a shortcoming of the Kubernetes integration. Instead, we have been > focusing > > our efforts in Apache Flink on the operations of a single Job, and left > > orchestration and lifecycle management that spans multiple Jobs to > > ecosystem projects. I still believe that we should keep this focus on low > > level composable building blocks (like Jobs and Snapshots) in Apache > Flink > > to make it easy for everyone to build fitting higher level abstractions > > like a FlinkApplication Custom Resource on top of it. For example, we are > > currently contributing multiple improvements [1,2,3,4] to the REST API > and > > Application Mode that in our experience will make it easier to manage > > Apache Flink with a Kubernetes operator. Given this background, I > suspect a > > Kubernetes Operator in Apache Flink would not be a priority for us at > > Ververica - at least right now. > > > > Having said this, if others in the community have the capacity to push > and > > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache > > Flink, I don't see any blockers. If or when this happens, I'd see some > > clear benefits of using a separate repository (easier independent > > versioning and releases, different build system & tooling (go, I > assume)). > > > > Looking forward to your thoughts, > > > > Konstantin > > > > [1] https://issues.apache.org/jira/browse/FLINK-24275 > > [2] https://issues.apache.org/jira/browse/FLINK-24208 > > [3] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore > > [4] https://issues.apache.org/jira/browse/FLINK-24113 > > > > On Mon, Jan 10, 2022 at 2:11 PM Gyula Fóra <gyf...@apache.org> wrote: > > > > > Hi All! > > > > > > This is a very interesting discussion. > > > > > > I think many users find it confusing what deployment mode to choose > when > > > considering a new production application on Kubernetes. With all the > > > options of native, standalone and different operators this can get > tricky :) > > > > > > I really like the idea that Thomas brought up to have at least a > minimal > > > operator implementation in Flink itself to cover the most common > production > > > job lifecycle management scenarios. I think the Flink community has a > very > > > strong experience in this area to create a successful implementation > that > > > would benefit most production users on Kubernetes. > > > > > > Cheers, > > > Gyula > > > > > > On Mon, Jan 10, 2022 at 4:29 AM Yang Wang <danrtsey...@gmail.com> > wrote: > > > > > >> Thanks all for this fruitful discussion. > > >> > > >> I think Xintong has given a strong point why we introduced the native > K8s > > >> integration, which is active resource management. > > >> I have a concrete example for this in the production. When a K8s node > is > > >> down, the standalone K8s deployment will take longer > > >> recovery time based on the K8s eviction time(IIRC, default is 5 > minutes). > > >> For the native K8s integration, Flink RM could be aware of the > > >> TM heartbeat lost and allocate a new one timely. > > >> > > >> Also when introducing the native K8s integration, another hit is that > we > > >> should make the users are easy enough to migrate from YARN deployment. > > >> They already have a production-ready job life-cycle management system, > > >> which is using Flink CLI to submit the Flink jobs. > > >> So we provide a consistent command "bin/flink run-application -t > > >> kubernetes-application/yarn-application" to start a Flink application > and > > >> "bin/flink cancel/stop ..." > > >> to terminate a Flink application. > > >> > > >> > > >> Compared with K8s operator, I know that this is not a K8s > > >> native mechanism. Hence, I also agree that we still need a powerful > K8s > > >> operator which > > >> could work with both standalone and native K8s modes. The major > > >> difference between them is how to start the JM and TM pods. For > standalone, > > >> they are managed by K8s job/deployment. For native, maybe we could > simply > > >> create a submission carrying the "flink run-application" arguments > > >> which is derived from the Flink application CR. > > >> > > >> Make the Flink's active resource manager can talk to the K8s operator > is > > >> an interesting option, which could support both standalone and native. > > >> Then Flink RM just needs to declare the resource requirement(e.g. 2 * > > >> <2G, 1CPU>, 2 * <4G, 1CPU>) and defer the resource > allocation/de-allocation > > >> to the K8s operator. It feels like an intermediate form between native > > >> and standalone mode :) > > >> > > >> > > >> > > >> Best, > > >> Yang > > >> > > >> > > >> > > >> Xintong Song <tonysong...@gmail.com> 于2022年1月7日周五 12:02写道: > > >> > > >>> Hi folks, > > >>> > > >>> Thanks for the discussion. I'd like to share my two cents on this > topic. > > >>> > > >>> Firstly, I'd like to clarify my understanding of the concepts "native > > >>> k8s integration" and "active resource management". > > >>> - Native k8s integration means Flink's master interacts with k8s' api > > >>> server directly. It acts like embedding an operator inside Flink's > master, > > >>> which manages the resources (pod, deployment, configmap, etc.) and > watches > > >>> / reacts to related events. > > >>> - Active resource management means Flink can actively start / > terminate > > >>> workers as needed. Its key characteristic is that the resource a > Flink > > >>> deployment uses is decided by the job's execution plan, unlike the > opposite > > >>> reactive mode (resource available to the deployment decides the > execution > > >>> plan) or the standalone mode (both execution plan and deployment > resources > > >>> are predefined). > > >>> > > >>> Currently, we have the yarn and native k8s deployments (and the > recently > > >>> removed mesos deployment) in active mode, due to their ability to > request / > > >>> release worker resources from the underlying cluster. And all the > existing > > >>> operators, AFAIK, work with a Flink standalone deployment, where > Flink > > >>> cannot request / release resources by itself. > > >>> > > >>> From this perspective, I think a large part of the native k8s > > >>> integration advantages come from the active mode: being able to > better > > >>> understand the job's resource requirements and adjust the deployment > > >>> resource accordingly. Both fine-grained resource management > (customizing TM > > >>> resources for different tasks / operators) and adaptive batch > scheduler > > >>> (rescale the deployment w.r.t. different stages) fall into this > category. > > >>> > > >>> I'm wondering if we can have an operator that also works with the > active > > >>> mode. Instead of talking to the api server directly for adding / > deleting > > >>> resources, Flink's active resource manager can talk to the operator > (via > > >>> CR) about the resources the deployment needs, and let the operator to > > >>> actually add / remove the resources. The operator should be able to > work > > >>> with (active) or without (standalone) the information of deployment's > > >>> resource requirements. In this way, users are free to choose between > active > > >>> and reactive (e.g., HPA) rescaling, while always benefiting from the > > >>> beyond-deployment lifecycle (upgrades, savepoint management, etc.) > and > > >>> alignment with the K8s ecosystem (Flink client free, operating via > kubectl, > > >>> etc.). > > >>> > > >>> Thank you~ > > >>> > > >>> Xintong Song > > >>> > > >>> > > >>> > > >>> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise <t...@apache.org> wrote: > > >>> > > >>>> Hi David, > > >>>> > > >>>> Thank you for the reply and context! > > >>>> > > >>>> As for workload types and where native integration might fit: I > think > > >>>> that any k8s native solution that satisfies category 3) can also > take > > >>>> care of 1) and 2) while the native integration by itself can't > achieve > > >>>> that. Existence of [1] might serve as further indication. > > >>>> > > >>>> The k8s operator pattern would be an essential building block for a > > >>>> k8s native solution that is interoperable with k8s ecosystem tooling > > >>>> like kubectl, which is why [2] and subsequent derived art were > > >>>> created. Specifically the CRD allows us to directly express the > > >>>> concept of a Flink application consisting of job manager and task > > >>>> manager pods along with associated create/update/delete operations. > > >>>> > > >>>> Would it make sense to gauge interest to have such an operator as > part > > >>>> of Flink? It appears so from discussions like [3]. I think such > > >>>> addition would significantly lower the barrier to adoption, since > like > > >>>> you mentioned one cannot really run mission critical streaming > > >>>> workloads with just the Apache Flink release binaries alone. While > it > > >>>> is great to have multiple k8s operators to choose from that are > > >>>> managed outside Flink, it is unfortunately also evident that today's > > >>>> hot operator turns into tomorrow's tech debt. I think such fate > would > > >>>> be less likely within the project, when multiple parties can join > > >>>> forces and benefit from each other's contributions. There were > similar > > >>>> considerations and discussions around Docker images in the past. > > >>>> > > >>>> Out of the features that you listed it is particularly the > application > > >>>> upgrade that needs to be solved through an external process like > > >>>> operator. The good thing is that many folks have already thought > hard > > >>>> about this and in existing implementations we see different > strategies > > >>>> that have their merit and production mileage (certainly applies to > > >>>> [2]). We could combine the best of these ideas into a unified > > >>>> implementation as part of Flink itself as starting point. > > >>>> > > >>>> Cheers, > > >>>> Thomas > > >>>> > > >>>> > > >>>> [1] https://github.com/wangyang0918/flink-native-k8s-operator > > >>>> [2] https://github.com/lyft/flinkk8soperator > > >>>> [3] > https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080 > > >>>> > > >>>> > > >>>> On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org> > wrote: > > >>>> > > > >>>> > Hi Thomas, > > >>>> > > > >>>> > AFAIK there are no specific plans in this direction with the > native > > >>>> integration, but I'd like to share some thoughts on the topic > > >>>> > > > >>>> > In my understanding there are three major groups of workloads in > > >>>> Flink: > > >>>> > > > >>>> > 1) Batch workloads > > >>>> > 2) Interactive workloads (Both Batch and Streaming; eg. SQL > Gateway / > > >>>> Zeppelin Notebooks / VVP ...) > > >>>> > 3) "Mission Critical" Streaming workloads > > >>>> > > > >>>> > I think the native integration fits really well in the first two > > >>>> categories. Let's talk about these first: > > >>>> > > > >>>> > 1) Batch workloads > > >>>> > > > >>>> > You don't really need to address the upgrade story here. The > > >>>> interesting topic is how to "dynamically" adjust parallelism as the > > >>>> workload can change between stages. This is where the Adaptive Batch > > >>>> Scheduler [1] comes into play. To leverage the scheduler to the full > > >>>> extend, it needs to be deployed with the remote shuffle service in > place > > >>>> [2], so the Flink's Resource Manager can free TaskManagers that are > no > > >>>> longer needed. > > >>>> > > > >>>> > This can IMO work really well with the native integration as > there is > > >>>> really clear approach on how the Resource Manager should decide on > what > > >>>> resources should be allocated. > > >>>> > > > >>>> > 2) Interactive workloads > > >>>> > > > >>>> > Again, the upgrade story is not really interesting in this > scenario. > > >>>> For batch workloads, it's basically the same as the above. For > streaming > > >>>> one this gets tricky. The main initiative that we current have in > terms of > > >>>> auto scaling / re-scaling of the streaming workloads is the > reactive mode > > >>>> (adaptive scheduler) [3]. > > >>>> > > > >>>> > I can totally see how the reactive mode could be integrated in the > > >>>> native integration, but with the application mode, which is not > really > > >>>> suitable for the interactive workloads. For integration with session > > >>>> cluster, we'd first need to address the "scheduling" problem of how > to > > >>>> distribute newly available resources between multiple jobs. > > >>>> > > > >>>> > What's pretty neat here is that the interpreter (zeppelin, sql gw, > > >>>> ...) have a really convenient way of spinning up a new cluster > inside k8s. > > >>>> > > > >>>> > 3) "Mission Critical" Streaming workloads > > >>>> > > > >>>> > This one is IMO the primary reason why one would consider > building a > > >>>> new operator these days as this needs a careful lifecycle > management of the > > >>>> pipeline. I assume this is also the use case that you're > investigating, am > > >>>> I correct? > > >>>> > > > >>>> > I'd second the requirements that you've already stated: > > >>>> > a) Resource efficiency - being able to re-scale based on the > > >>>> workload, in order to keep up with the input / not waste resources > > >>>> > b) Fast recovery > > >>>> > c) Application upgrades > > >>>> > > > >>>> > I personally don't think that the native integration is really > > >>>> suitable here. The direction that we're headed is with the > standalone > > >>>> deployment on Kubernetes + the reactive mode (adaptive scheduler). > > >>>> > > > >>>> > In theory, if we want to build a really cloud (Kubernetes) native > > >>>> stream processor, deploying the pipeline should be as simple as > deploying > > >>>> any other application. It should be also simple to integrate with > CI & CD > > >>>> environment and the fast / frequent deploy philosophy. > > >>>> > > > >>>> > Let's see where we stand and where we can expand from there: > > >>>> > > > >>>> > a) Resource efficiency > > >>>> > > > >>>> > We already have the reactive mode in place. This allows you to > add / > > >>>> remove task managers by adjusting the TM deployment (`kubectl scale > ...`) > > >>>> and Flink will automatically react to the available resources. This > is > > >>>> currently only supported with the Application Mode, that is limited > to a > > >>>> single job (which should be enough for this kind of workload). > > >>>> > > > >>>> > The re-scaling logic is left completely up to the user and can be > as > > >>>> simple as setting up a HPA (Horizontal Pod Autoscaler). I tend to > think in > > >>>> the direction, that we might want to provide a custom k8s metrics > server, > > >>>> that allows HPA to query the metrics from JM, to make this more > flexible > > >>>> and easy to use. > > >>>> > > > >>>> > As this looks really great in theory, there are still some > > >>>> shortcomings that we're actively working on addressing. For this > feature to > > >>>> be really widely adopted, we need to make the re-scaling experience > as fast > > >>>> as possible, so we can re-scale often to react to the input rate. > This > > >>>> could be currently a problem with large RocksDB states as this > involves > > >>>> full re-balance of the state (splitting / merging RocksDB > instances). The > > >>>> k8s operator approach has the same / even worse limitation as it > involves > > >>>> taking a savepoint a re-building the state from it. > > >>>> > > > >>>> > b) Fast recovery > > >>>> > > > >>>> > This is IMO not as different from the native mode (although I'd > have > > >>>> to check whether RM failover can reuse task managers). This involves > > >>>> frequent and fast checkpointing, local recovery (which is still not > > >>>> supported in reactive mode, but this will be hopefully addressed > soon) and > > >>>> working directory efforts [4]. > > >>>> > > > >>>> > c) Application upgrades > > >>>> > > > >>>> > This is the topic I'm still struggling with a little. Historically > > >>>> this involves external lifecycle management (savepoint + submitting > a new > > >>>> job). I think at the end of the day, with application mode on > standalone > > >>>> k8s, it could be as simple as updating the docker image of the JM > > >>>> deployment. > > >>>> > > > >>>> > If I think about the simplest upgrade scenario, simple in-place > > >>>> restore from the latest checkpoint, it may be fairly simple to > implement. > > >>>> What I'm struggling with are the more complex upgrade scenarios > such as > > >>>> dual, blue / green deployment. > > >>>> > > > >>>> > > > >>>> > To sum this up, I'd really love if Flink could provide great > out-of > > >>>> the box experience with standalone mode on k8s, that makes the > experience > > >>>> as close to running / operating any other application as possible. > > >>>> > > > >>>> > I'd really appreciate to hear your thoughts on this topic. > > >>>> > > > >>>> > [1] > > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler > > >>>> > [2] https://github.com/flink-extended/flink-remote-shuffle > > >>>> > [3] > > >>>> > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/ > > >>>> > [4] > > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes > > >>>> > > > >>>> > Best, > > >>>> > D. > > >>>> > > > >>>> > On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org> > wrote: > > >>>> >> > > >>>> >> Hi, > > >>>> >> > > >>>> >> I was recently looking at the Flink native Kubernetes > integration [1] > > >>>> >> to get an idea how it relates to existing operator based > solutions > > >>>> >> [2], [3]. > > >>>> >> > > >>>> >> Part of the native integration's motivations was simplicity (no > extra > > >>>> >> component to install), but arguably that is also a shortcoming. > The > > >>>> >> k8s operator model can offer support for application lifecycle > like > > >>>> >> upgrade and rescaling, as well as job submission without a Flink > > >>>> >> client. > > >>>> >> > > >>>> >> When using the Flink native integration it would still be > necessary > > >>>> to > > >>>> >> provide that controller functionality. Is the idea to use the > native > > >>>> >> integration for task manager resource allocation in tandem with > an > > >>>> >> operator that provides the external controller functionality? If > > >>>> >> anyone using the Flink native integration can share experience, I > > >>>> >> would be curious to learn more about the specific setup and if > there > > >>>> >> are plans to expand the k8s native integration capabilities. > > >>>> >> > > >>>> >> For example: > > >>>> >> > > >>>> >> * Application upgrade with features such as [4]. Since the job > > >>>> manager > > >>>> >> is part of the deployment it cannot orchestrate the deployment. > It > > >>>> >> needs to be the responsibility of an external process. Has anyone > > >>>> >> contemplated adding such a component to Flink itself? > > >>>> >> > > >>>> >> * Rescaling: Theoretically a parallelism change could be > performed > > >>>> w/o > > >>>> >> restart of the job manager pod. Hence, building blocks to > trigger and > > >>>> >> apply rescaling could be part of Flink itself. Has this been > explored > > >>>> >> further? > > >>>> >> > > >>>> >> Yang kindly pointed me to [5]. Is the recommendation/conclusion > that > > >>>> >> when a k8s operator is already used, then let it be in charge of > the > > >>>> >> task manager resource allocation? If so, what scenario was the > native > > >>>> >> k8s integration originally intended for? > > >>>> >> > > >>>> >> Thanks, > > >>>> >> Thomas > > >>>> >> > > >>>> >> [1] > > >>>> > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes > > >>>> >> [2] https://github.com/lyft/flinkk8soperator > > >>>> >> [3] https://github.com/spotify/flink-on-k8s-operator > > >>>> >> [4] > > >>>> > https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md > > >>>> >> [5] > https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d > > >>>> > > >>> > > > > -- > > > > Konstantin Knauf > > > > https://twitter.com/snntrable > > > > https://github.com/knaufk > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk