Correction: I'm fine to use a void blobService in OLAP scenarios if it works better in most cases. -> I'm fine to use a void blobService in OLAP scenarios *by default* if it works better in most cases.
Zhu Zhu <reed...@gmail.com> 于2024年1月15日周一 17:51写道: > @Yangze > > > (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a > 100% increase in QPS > > Did you look into which part takes most of the time? Jar uploading, Jar > downloading, JobInformation shipping, TDD shipping, or others? > > If these objects are large, e.g. a hundreds megabytes connector jar, > will ship it hundreds of times(if parallelism > 100) from JMs to TMs > be a blocker of performance and stability, compared letting the DFS > help with the shipping. If yes, we should not force it to use a void > blobService. Maybe an option should be given to users to switch between > blobServices? > > I'm fine to use a void blobService in OLAP scenarios if it works better > in most cases. However, it is a bit weird that we disable blobs if > `enable-job-recovery=false`. Conceptually, they should be unrelated. > > > As Matthias mentioned, each component still needs to write its RPC > address, so this part of the writing may be unavoidable. > > Thanks Matthias for the inputs. > However, even in non-ha mode, that task manager can connect to JobMaster. > Therefore, I guess it's not necessary to store JM addresses externally. > I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever` > accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed > for TMs to find out the addresses of JMs via external services? > > > focus on the discussion of HA functionality in the OLAP scenario in > FLIP-403 and exclude the refactoring from the scope of this FLIP > > It sounds good to me. > Actually the concept of separating leader election and persistence > looks great to me at the first glance. But the shared MaterialProvider > makes it more complicated than I had expected. > > Thanks, > Zhu > > Yangze Guo <karma...@gmail.com> 于2024年1月11日周四 14:53写道: > >> Thanks for the comments, Zhu and Matthias. >> >> @Zhu Zhu >> >> > How about disabling the checkpoint to avoid the cost? I know the cost >> is there even if we disable the checkpoint at the moment. But I think it >> can be fixed. >> > If HA is disabled, the jobmanager needs to directly participate in all >> blob shipping work which may result in a hot-spot. >> >> Currently, there are several persistence services that have specific >> implementations based on the HA mode: >> - JobGraphStore and JobResultStore: These are related to job recovery >> and can cause significant redundant I/O in OLAP scenarios, impacting >> performance. It may be necessary to configure them as in-memory stores >> for OLAP. >> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this >> overhead by disabling checkpoints. I agree to remove Checkpoint >> Storage from the scope of this FLIP. >> - BlobStore: Agree that disabling BlobStore can potentially lead to >> hotspots in JobManagers. However, enabling it in OLAP scenarios can >> also result in high external storage access overhead , e.g. >> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off. >> In our internal benchmark for short query (with 128 parallelism >> WordCount jobs), disabling BlobStore resulted in a 100% increase in >> QPS. Therefore, I lean towards disabling it. WDYT? >> >> > FLINK-24038 >> >> As Matthias mentioned, each component still needs to write its RPC >> address, so this part of the writing may be unavoidable. >> >> @Zhu Zhu @Matthias >> >> > I don't see why the PersistenceServices needs to have access to the >> MaterialProvider. I feel like there shouldn't be a component that's shared >> between the LeaderElectionService and the PersistenceServices. >> > The corresponding ZooKeeper/k8s implementation would hold the client >> instance (which is the only thing that should be shared between the >> LeaderElectionService and the PersistenceServices implementations). >> >> Yes, I agree that this is the goal of splitting the interfaces. >> However, when I attempted to split it, I found that these two services >> still have implicit temporal dependencies, such as the closure of the >> client instance and the cleanup of services and job data. >> >> Regards the refactoring of HighAvailabilityServices, I try to >> summarize the following issues that need to be considered: >> - Splitting LeaderServices and PersistenceServices; As Matthias >> mentioned, this allows for easier testing. >> - Removal of deprecated interfaces, such as >> getWebMonitorLeaderElectionService. >> - Reviewing existing multiple close and cleanup interfaces. >> - Integration of StandaloneHaServices and EmbeddedHaServices. >> I think this topic might be big enough to have a separate discussion >> thread. I am now inclined to focus on the discussion of HA >> functionality in the OLAP scenario in FLIP-403 and exclude the >> refactoring from the scope of this FLIP. This way, we can simply >> return different persistence services in AbstractHaServices based on >> the configuration. And I'm willing to file a new FLIP (or perhaps a >> ticket would be sufficient) for the refactoring of HA. WDYT? >> >> >> Best, >> Yangze Guo >> >> On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl >> <matthias.p...@aiven.io.invalid> wrote: >> > >> > Thanks for joining the discussion, everyone and sorry for picking it up >> > that late. Here are a few points, I want to add to this discussion: >> > >> > - FLINK-24038 [1] led to a reduction of the curator/k8s client leader >> > election requests by having a single leader election per JM rather than >> > individual once per RPCEndpoint. We still need to have one record per >> > component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...), >> > though, because we need to save the address for RPC calls (Akka/Pekko) >> per >> > component (each JobMaster has its own RPC endpoint with a dedicated >> port). >> > That is why we cannot get rid of the individual entries/znodes per job. >> > >> > - An alternative for this FLIP's proposal would be to stick to the >> current >> > HighAvailabilityServices interface. We could come up with a new >> > implementation that does provide Standalone instances of what you call >> > PersistentServices in this FLIP. That would reduce the efforts that come >> > with refactoring the HighAvailabilityServices interface. It should be >> > discussed here as an alternative and probably mentioned in the FLIP as a >> > rejected alternative if the community agrees. >> > >> > - From a conceptual point of view, splitting the >> HighAvailabilityServices >> > into LeaderElectionService and PersistentServices (I'm wondering whether >> > something like JobHighAvailabilityServices would be more descriptive >> here. >> > The word "persistence" is a bit ambiguous and can also be used in >> scenarios >> > other than HA) makes sense in my opinion. One hint why separating this >> big >> > interface HighAvailabilityServices into two smaller interfaces would >> make >> > sense is the fact that there is a test >> > implementation EmbeddedHaServicesWithLeadershipControl right now that >> > provides embedded HA with helper methods to control the LeaderElection >> in >> > ITCases. It is a workaround to get access to leader election. With two >> > separate interfaces, we could make it easier to test these things. >> > >> > - I'm not too sure about the proposed class hierarchy of FLIP-403: >> > - What are the semantics of the "MaterialProvider". The name doesn't >> give >> > me any hints on the interface/class purpose. There could be some >> > description for this component being added to the FLIP. But on another >> > note: I don't see why the PersistenceServices needs to have access to >> the >> > MaterialProvider. I feel like there shouldn't be a component that's >> shared >> > between the LeaderElectionService and the PersistenceServices. >> > - Alternative: What about coming up with a factory interface >> > HighAvailabilityServicesFactory which provides two methods: >> > createLeaderElectionService & createPersistenceServices. The factory >> > wouldn't need to keep any instances (as suggested by this FLIP's >> > HighAvailabilityServices component. It's a plain factory component that >> > creates instances. The corresponding ZooKeeper/k8s implementation would >> > hold the client instance (which is the only thing that should be shared >> > between the LeaderElectionService and the PersistenceServices >> > implementations). The factory would live in the ClusterEntrypoint. Any >> > cleanup of HA data would be covered by the >> > LeaderElection|PersistenceServices, individually. >> > >> > Looking forward to your opinions. >> > Best, >> > Matthias >> > >> > On Tue, Jan 9, 2024 at 1:23 PM Zhu Zhu <reed...@gmail.com> wrote: >> > >> > > > I would treat refactoring as a technical debt... >> > > >> > > Sorry I don't quite get the needs of the refactoring work. >> > > >> > > The refactoring work brings benefits if there are requirements to >> combine >> > > different leader election services and persistence services. >> > > The answer in this FLIP is to combine DefaultLeaderServices and >> > > EmbeddedPersistenceServices. But I'm concerned that, if the goal is to >> > > avoid the cost of job recovery, disable the persistence of the overall >> > > cluster might be an overkill. e.g. if later we want the cluster >> partitions >> > > to be recovered after JM failover? >> > > >> > > Yet I do not think of the needs of other new combinations at the >> moment, >> > > e.g. a non-HA leader election service with an HA persistence service, >> > > a ZK leader election service with a K8s persistence service. Maybe you >> > > have some good cases for it? >> > > >> > > TBH, the current class structure looks simpler to me. I'm also >> wondering >> > > whether it's possible to merge StandaloneHaServices with >> > > EmbeddedHaServices, >> > > because the latter one is a special case(all components in the same >> > > process) >> > > of the former one. >> > > >> > > > it still involves creating a znode or writing to the configmap >> > > for each job >> > > >> > > Is it possible to avoid the cost? My gut feeling is that these actions >> > > are not necessary after Flink does leader election for the overall >> master >> > > process. >> > > >> > > > such as checkpoint and blob storage except for the job graph store >> > > >> > > How about disabling the checkpoint to avoid the cost? I know the cost >> is >> > > there >> > > even if we disable the checkpoint at the moment. But I think it can be >> > > fixed. >> > > Checkpoint is not needed if job recovery is not needed, the concepts >> are >> > > highly related. >> > > >> > > Regarding blob storage, I'm not sure whether it's good to disable HA >> for >> > > it. >> > > If HA is disabled, the jobmanager needs to directly participate in >> all blob >> > > shipping work which may result in a hot-spot. >> > > >> > > WDYT? >> > > >> > > Thanks, >> > > Zhu >> > > >> > > Yangze Guo <karma...@gmail.com> 于2024年1月9日周二 10:55写道: >> > > >> > > > Thank you for your comments, Zhu! >> > > > >> > > > 1. I would treat refactoring as a technical debt and a side effect >> of >> > > > this FLIP. The idea is inspired by Matthias' comments in [1]. It >> > > > suggests having a single implementation of HighAvailabilityServices >> > > > that requires a factory method for persistence services and leader >> > > > services. After this, we will achieve a clearer class hierarchy for >> > > > HAServices and eliminate code duplication. >> > > > >> > > > 2. While FLINK-24038 does eliminate the leader election time cost >> for >> > > > each job, it still involves creating a znode or writing to the >> > > > configmap for each job, which can negatively impact performance >> under >> > > > higher workloads. This also applies to all other persistence >> services >> > > > such as checkpoint and blob storage except for the job graph store. >> > > > >> > > > WDYT? >> > > > >> > > > [1] >> > > > >> > > >> https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054 >> > > > >> > > > Best, >> > > > Yangze Guo >> > > > >> > > > On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu <reed...@gmail.com> wrote: >> > > > > >> > > > > Thanks for creating the FLIP and starting the discussion, Yangze. >> It >> > > > makes >> > > > > sense to me to improve the job submission performance in OLAP >> > > scenarios. >> > > > > >> > > > > I have a few questions regarding the proposed changes: >> > > > > >> > > > > 1. How about skipping the job graph persistence if the proposed >> config >> > > > > 'high-availability.enable-job-recovery' is set to false? In this >> way, >> > > > > we do not need to do the refactoring work. >> > > > > >> > > > > 2. Instead of using different HA services for Dispatcher and >> JobMaster. >> > > > > Can we leverage the work of FLINK-24038 to eliminate the leader >> > > election >> > > > > time cost of each job? Honestly I had thought it was already the >> truth >> > > > but >> > > > > seems it is not. This improvement can also benefit non-OLAP jobs. >> > > > > >> > > > > Thanks, >> > > > > Zhu >> > > > > >> > > > > Yangze Guo <karma...@gmail.com> 于2024年1月8日周一 17:11写道: >> > > > > >> > > > > > Thanks for the pointer, Rui! >> > > > > > >> > > > > > I have reviewed FLIP-383, and based on my understanding, this >> feature >> > > > > > should be enabled by default for batch jobs in the future. >> Therefore, >> > > > > > +1 for checking the parameters and issuing log warnings when >> the user >> > > > > > explicitly configures execution.batch.job-recovery.enabled to >> true. >> > > > > > >> > > > > > +1 for high-availability.job-recovery.enabled, which would be >> more >> > > > > > suitable with YAML hierarchy. >> > > > > > >> > > > > > >> > > > > > Best, >> > > > > > Yangze Guo >> > > > > > >> > > > > > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> >> wrote: >> > > > > > > >> > > > > > > Thanks to Yangze driving this proposal! >> > > > > > > >> > > > > > > Overall looks good to me! This proposal is useful for >> > > > > > > the performance when the job doesn't need the failover. >> > > > > > > >> > > > > > > I have some minor questions: >> > > > > > > >> > > > > > > 1. How does it work with FLIP-383[1]? >> > > > > > > >> > > > > > > This FLIP introduces a high-availability.enable-job-recovery, >> > > > > > > and FLIP-383 introduces a >> execution.batch.job-recovery.enabled. >> > > > > > > >> > > > > > > IIUC, when high-availability.enable-job-recovery is false, >> the job >> > > > > > > cannot recover even if execution.batch.job-recovery.enabled = >> true, >> > > > > > > right? >> > > > > > > >> > > > > > > If so, could we check some parameters and warn some logs? Or >> > > > > > > disable the execution.batch.job-recovery.enabled directly when >> > > > > > > high-availability.enable-job-recovery = false. >> > > > > > > >> > > > > > > 2. Could we rename it to >> high-availability.job-recovery.enabled to >> > > > unify >> > > > > > > the naming? >> > > > > > > >> > > > > > > WDYT? >> > > > > > > >> > > > > > > [1] https://cwiki.apache.org/confluence/x/QwqZE >> > > > > > > >> > > > > > > Best, >> > > > > > > Rui >> > > > > > > >> > > > > > > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo <karma...@gmail.com >> > >> > > > wrote: >> > > > > > > >> > > > > > > > Thanks for your comment, Yong. >> > > > > > > > >> > > > > > > > Here are my thoughts on the splitting of >> HighAvailableServices: >> > > > > > > > Firstly, I would treat this separation as a result of >> technical >> > > > debt >> > > > > > > > and a side effect of the FLIP. In order to achieve a cleaner >> > > > interface >> > > > > > > > hierarchy for High Availability before Flink 2.0, the design >> > > > decision >> > > > > > > > should not be limited to OLAP scenarios. >> > > > > > > > I agree that the current HAServices can be divided based on >> > > either >> > > > the >> > > > > > > > actual target (cluster & job) or the type of functionality >> > > (leader >> > > > > > > > election & persistence). From a conceptual perspective, I >> do not >> > > > see >> > > > > > > > one approach being better than the other. However, I have >> chosen >> > > > the >> > > > > > > > current separation for a clear separation of concerns. After >> > > > FLIP-285, >> > > > > > > > each process has a dedicated LeaderElectionService >> responsible >> > > for >> > > > > > > > leader election of all the components within it. This >> > > > > > > > LeaderElectionService has its own lifecycle management. If >> we >> > > were >> > > > to >> > > > > > > > split the HAServices into 'ClusterHighAvailabilityService' >> and >> > > > > > > > 'JobHighAvailabilityService', we would need to couple the >> > > lifecycle >> > > > > > > > management of these two interfaces, as they both rely on the >> > > > > > > > LeaderElectionService and other relevant classes. This >> coupling >> > > and >> > > > > > > > implicit design assumption will increase the complexity and >> > > testing >> > > > > > > > difficulty of the system. WDYT? >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > Yangze Guo >> > > > > > > > >> > > > > > > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang < >> zjur...@gmail.com> >> > > > wrote: >> > > > > > > > > >> > > > > > > > > Thanks Yangze for starting this discussion. I have one >> comment: >> > > > why >> > > > > > do we >> > > > > > > > > need to abstract two services as `LeaderServices` and >> > > > > > > > > `PersistenceServices`? >> > > > > > > > > >> > > > > > > > > From the content, the purpose of this FLIP is to make job >> > > > failover >> > > > > > more >> > > > > > > > > lightweight, so it would be more appropriate to abstract >> two >> > > > > > services as >> > > > > > > > > `ClusterHighAvailabilityService` and >> > > `JobHighAvailabilityService` >> > > > > > instead >> > > > > > > > > of `LeaderServices` and `PersistenceServices` based on >> leader >> > > and >> > > > > > store. >> > > > > > > > In >> > > > > > > > > this way, we can create a `JobHighAvailabilityService` >> that >> > > has a >> > > > > > leader >> > > > > > > > > service and store for the job that meets the requirements >> based >> > > > on >> > > > > > the >> > > > > > > > > configuration in the zk/k8s high availability service. >> > > > > > > > > >> > > > > > > > > WDYT? >> > > > > > > > > >> > > > > > > > > Best, >> > > > > > > > > Fang Yong >> > > > > > > > > >> > > > > > > > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng < >> > > > xiangyu...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Thanks Yangze for restart this discussion. >> > > > > > > > > > >> > > > > > > > > > +1 for the overall idea. By splitting the >> > > > HighAvailabilityServices >> > > > > > into >> > > > > > > > > > LeaderServices and PersistenceServices, we may support >> > > > configuring >> > > > > > > > > > different storage behind them in the future. >> > > > > > > > > > >> > > > > > > > > > We did run into real problems in production where too >> much >> > > job >> > > > > > > > metadata was >> > > > > > > > > > being stored on ZK, causing system instability. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2023年12月29日周五 10:21写道: >> > > > > > > > > > >> > > > > > > > > > > Thanks for the response, Zhanghao. >> > > > > > > > > > > >> > > > > > > > > > > PersistenceServices sounds good to me. >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Yangze Guo >> > > > > > > > > > > >> > > > > > > > > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen >> > > > > > > > > > > <zhanghao.c...@outlook.com> wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for driving this effort, Yangze! The proposal >> > > > overall >> > > > > > LGTM. >> > > > > > > > > > Other >> > > > > > > > > > > from the throughput enhancement in the OLAP scenario, >> the >> > > > > > separation >> > > > > > > > of >> > > > > > > > > > > leader election/discovery services and the metadata >> > > > persistence >> > > > > > > > services >> > > > > > > > > > > will also make the HA impl clearer and easier to >> maintain. >> > > > Just a >> > > > > > > > minor >> > > > > > > > > > > comment on naming: would it better to rename >> > > > PersistentServices >> > > > > > to >> > > > > > > > > > > PersistenceServices, as usually we put a noun before >> > > > Services? >> > > > > > > > > > > > >> > > > > > > > > > > > Best, >> > > > > > > > > > > > Zhanghao Chen >> > > > > > > > > > > > ________________________________ >> > > > > > > > > > > > From: Yangze Guo <karma...@gmail.com> >> > > > > > > > > > > > Sent: Tuesday, December 19, 2023 17:33 >> > > > > > > > > > > > To: dev <dev@flink.apache.org> >> > > > > > > > > > > > Subject: [DISCUSS] FLIP-403: High Availability >> Services >> > > for >> > > > > > OLAP >> > > > > > > > > > > Scenarios >> > > > > > > > > > > > >> > > > > > > > > > > > Hi, there, >> > > > > > > > > > > > >> > > > > > > > > > > > We would like to start a discussion thread on >> "FLIP-403: >> > > > High >> > > > > > > > > > > > Availability Services for OLAP Scenarios"[1]. >> > > > > > > > > > > > >> > > > > > > > > > > > Currently, Flink's high availability service >> consists of >> > > > two >> > > > > > > > > > > > mechanisms: leader election/retrieval services for >> > > > JobManager >> > > > > > and >> > > > > > > > > > > > persistent services for job metadata. However, these >> > > > > > mechanisms are >> > > > > > > > > > > > set up in an "all or nothing" manner. In OLAP >> scenarios, >> > > we >> > > > > > > > typically >> > > > > > > > > > > > only require leader election/retrieval services for >> > > > JobManager >> > > > > > > > > > > > components since jobs usually do not have a restart >> > > > strategy. >> > > > > > > > > > > > Additionally, the persistence of job states can >> > > negatively >> > > > > > impact >> > > > > > > > the >> > > > > > > > > > > > cluster's throughput, especially for short query >> jobs. >> > > > > > > > > > > > >> > > > > > > > > > > > To address these issues, this FLIP proposes >> splitting the >> > > > > > > > > > > > HighAvailabilityServices into LeaderServices and >> > > > > > > > PersistentServices, >> > > > > > > > > > > > and enable users to independently configure the high >> > > > > > availability >> > > > > > > > > > > > strategies specifically related to jobs. >> > > > > > > > > > > > >> > > > > > > > > > > > Please find more details in the FLIP wiki document >> [1]. >> > > > Looking >> > > > > > > > > > > > forward to your feedback. >> > > > > > > > > > > > >> > > > > > > > > > > > [1] >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios >> > > > > > > > > > > > >> > > > > > > > > > > > Best, >> > > > > > > > > > > > Yangze Guo >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > >> > > > >> > > >> >