Correction:
I'm fine to use a void blobService in OLAP scenarios if it works better
in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
*by default* if it works better in most cases.



Zhu Zhu <reed...@gmail.com> 于2024年1月15日周一 17:51写道:

> @Yangze
>
> > (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
> 100% increase in QPS
>
> Did you look into which part takes most of the time? Jar uploading, Jar
> downloading, JobInformation shipping, TDD shipping, or others?
>
> If these objects are large, e.g. a hundreds megabytes connector jar,
> will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> be a blocker of performance and stability, compared letting the DFS
> help with the shipping. If yes, we should not force it to use a void
> blobService. Maybe an option should be given to users to switch between
> blobServices?
>
> I'm fine to use a void blobService in OLAP scenarios if it works better
> in most cases. However, it is a bit weird that we disable blobs if
> `enable-job-recovery=false`. Conceptually, they should be unrelated.
>
> > As Matthias mentioned, each component still needs to write its RPC
> address, so this part of the writing may be unavoidable.
>
> Thanks Matthias for the inputs.
> However, even in non-ha mode, that task manager can connect to JobMaster.
> Therefore, I guess it's not necessary to store JM addresses externally.
> I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
> for TMs to find out the addresses of JMs via external services?
>
> > focus on the discussion of HA functionality in the OLAP scenario in
> FLIP-403 and exclude the refactoring from the scope of this FLIP
>
> It sounds good to me.
> Actually the concept of separating leader election and persistence
> looks great to me at the first glance. But the shared MaterialProvider
> makes it more complicated than I had expected.
>
> Thanks,
> Zhu
>
> Yangze Guo <karma...@gmail.com> 于2024年1月11日周四 14:53写道:
>
>> Thanks for the comments, Zhu and Matthias.
>>
>> @Zhu Zhu
>>
>> > How about disabling the checkpoint to avoid the cost? I know the cost
>> is there even if we disable the checkpoint at the moment. But I think it
>> can be fixed.
>> > If HA is disabled, the jobmanager needs to directly participate in all
>> blob shipping work which may result in a hot-spot.
>>
>> Currently, there are several persistence services that have specific
>> implementations based on the HA mode:
>> - JobGraphStore and JobResultStore: These are related to job recovery
>> and can cause significant redundant I/O in OLAP scenarios, impacting
>> performance. It may be necessary to configure them as in-memory stores
>> for OLAP.
>> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
>> overhead by disabling checkpoints. I agree to remove Checkpoint
>> Storage from the scope of this FLIP.
>> - BlobStore: Agree that disabling BlobStore can potentially lead to
>> hotspots in JobManagers. However, enabling it in OLAP scenarios can
>> also result in high external storage access overhead , e.g.
>> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
>> In our internal benchmark for short query (with 128 parallelism
>> WordCount jobs), disabling BlobStore resulted in a 100% increase in
>> QPS. Therefore, I lean towards disabling it. WDYT?
>>
>> > FLINK-24038
>>
>> As Matthias mentioned, each component still needs to write its RPC
>> address, so this part of the writing may be unavoidable.
>>
>> @Zhu Zhu @Matthias
>>
>> > I don't see why the PersistenceServices needs to have access to the
>> MaterialProvider. I feel like there shouldn't be a component that's shared
>> between the LeaderElectionService and the PersistenceServices.
>> > The corresponding ZooKeeper/k8s implementation would hold the client
>> instance (which is the only thing that should be shared between the
>> LeaderElectionService and the PersistenceServices implementations).
>>
>> Yes, I agree that this is the goal of splitting the interfaces.
>> However, when I attempted to split it, I found that these two services
>> still have implicit temporal dependencies, such as the closure of the
>> client instance and the cleanup of services and job data.
>>
>> Regards the refactoring of HighAvailabilityServices, I try to
>> summarize the following issues that need to be considered:
>> - Splitting LeaderServices and PersistenceServices; As Matthias
>> mentioned, this allows for easier testing.
>> - Removal of deprecated interfaces, such as
>> getWebMonitorLeaderElectionService.
>> - Reviewing existing multiple close and cleanup interfaces.
>> - Integration of StandaloneHaServices and EmbeddedHaServices.
>> I think this topic might be big enough to have a separate discussion
>> thread. I am now inclined to focus on the discussion of HA
>> functionality in the OLAP scenario in FLIP-403 and exclude the
>> refactoring from the scope of this FLIP. This way, we can simply
>> return different persistence services in AbstractHaServices based on
>> the configuration. And I'm willing to file a new FLIP (or perhaps a
>> ticket would be sufficient) for the refactoring of HA. WDYT?
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl
>> <matthias.p...@aiven.io.invalid> wrote:
>> >
>> > Thanks for joining the discussion, everyone and sorry for picking it up
>> > that late. Here are a few points, I want to add to this discussion:
>> >
>> > - FLINK-24038 [1] led to a reduction of the curator/k8s client leader
>> > election requests by having a single leader election per JM rather than
>> > individual once per RPCEndpoint. We still need to have one record per
>> > component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...),
>> > though, because we need to save the address for RPC calls (Akka/Pekko)
>> per
>> > component (each JobMaster has its own RPC endpoint with a dedicated
>> port).
>> > That is why we cannot get rid of the individual entries/znodes per job.
>> >
>> > - An alternative for this FLIP's proposal would be to stick to the
>> current
>> > HighAvailabilityServices interface. We could come up with a new
>> > implementation that does provide Standalone instances of what you call
>> > PersistentServices in this FLIP. That would reduce the efforts that come
>> > with refactoring the HighAvailabilityServices interface. It should be
>> > discussed here as an alternative and probably mentioned in the FLIP as a
>> > rejected alternative if the community agrees.
>> >
>> > - From a conceptual point of view, splitting the
>> HighAvailabilityServices
>> > into LeaderElectionService and PersistentServices (I'm wondering whether
>> > something like JobHighAvailabilityServices would be more descriptive
>> here.
>> > The word "persistence" is a bit ambiguous and can also be used in
>> scenarios
>> > other than HA) makes sense in my opinion. One hint why separating this
>> big
>> > interface HighAvailabilityServices into two smaller interfaces would
>> make
>> > sense is the fact that there is a test
>> > implementation EmbeddedHaServicesWithLeadershipControl right now that
>> > provides embedded HA with helper methods to control the LeaderElection
>> in
>> > ITCases. It is a workaround to get access to leader election. With two
>> > separate interfaces, we could make it easier to test these things.
>> >
>> > - I'm not too sure about the proposed class hierarchy of FLIP-403:
>> >   - What are the semantics of the "MaterialProvider". The name doesn't
>> give
>> > me any hints on the interface/class purpose. There could be some
>> > description for this component being added to the FLIP. But on another
>> > note: I don't see why the PersistenceServices needs to have access to
>> the
>> > MaterialProvider. I feel like there shouldn't be a component that's
>> shared
>> > between the LeaderElectionService and the PersistenceServices.
>> >   - Alternative: What about coming up with a factory interface
>> > HighAvailabilityServicesFactory which provides two methods:
>> > createLeaderElectionService & createPersistenceServices. The factory
>> > wouldn't need to keep any instances (as suggested by this FLIP's
>> > HighAvailabilityServices component. It's a plain factory component that
>> > creates instances. The corresponding ZooKeeper/k8s implementation would
>> > hold the client instance (which is the only thing that should be shared
>> > between the LeaderElectionService and the PersistenceServices
>> > implementations). The factory would live in the ClusterEntrypoint. Any
>> > cleanup of HA data would be covered by the
>> > LeaderElection|PersistenceServices, individually.
>> >
>> > Looking forward to your opinions.
>> > Best,
>> > Matthias
>> >
>> > On Tue, Jan 9, 2024 at 1:23 PM Zhu Zhu <reed...@gmail.com> wrote:
>> >
>> > > > I would treat refactoring as a technical debt...
>> > >
>> > > Sorry I don't quite get the needs of the refactoring work.
>> > >
>> > > The refactoring work brings benefits if there are requirements to
>> combine
>> > > different leader election services and persistence services.
>> > > The answer in this FLIP is to combine DefaultLeaderServices and
>> > > EmbeddedPersistenceServices. But I'm concerned that, if the goal is to
>> > > avoid the cost of job recovery, disable the persistence of the overall
>> > > cluster might be an overkill. e.g. if later we want the cluster
>> partitions
>> > > to be recovered after JM failover?
>> > >
>> > > Yet I do not think of the needs of other new combinations at the
>> moment,
>> > > e.g. a non-HA leader election service with an HA persistence service,
>> > > a ZK leader election service with a K8s persistence service. Maybe you
>> > > have some good cases for it?
>> > >
>> > > TBH, the current class structure looks simpler to me. I'm also
>> wondering
>> > > whether it's possible to merge StandaloneHaServices with
>> > > EmbeddedHaServices,
>> > > because the latter one is a special case(all components in the same
>> > > process)
>> > > of the former one.
>> > >
>> > > > it still involves creating a znode or writing to the configmap
>> > > for each job
>> > >
>> > > Is it possible to avoid the cost? My gut feeling is that these actions
>> > > are not necessary after Flink does leader election for the overall
>> master
>> > > process.
>> > >
>> > > > such as checkpoint and blob storage except for the job graph store
>> > >
>> > > How about disabling the checkpoint to avoid the cost? I know the cost
>> is
>> > > there
>> > > even if we disable the checkpoint at the moment. But I think it can be
>> > > fixed.
>> > > Checkpoint is not needed if job recovery is not needed, the concepts
>> are
>> > > highly related.
>> > >
>> > > Regarding blob storage, I'm not sure whether it's good to disable HA
>> for
>> > > it.
>> > > If HA is disabled, the jobmanager needs to directly participate in
>> all blob
>> > > shipping work which may result in a hot-spot.
>> > >
>> > > WDYT?
>> > >
>> > > Thanks,
>> > > Zhu
>> > >
>> > > Yangze Guo <karma...@gmail.com> 于2024年1月9日周二 10:55写道:
>> > >
>> > > > Thank you for your comments, Zhu!
>> > > >
>> > > > 1. I would treat refactoring as a technical debt and a side effect
>> of
>> > > > this FLIP. The idea is inspired by Matthias' comments in [1]. It
>> > > > suggests having a single implementation of HighAvailabilityServices
>> > > > that requires a factory method for persistence services and leader
>> > > > services. After this, we will achieve a clearer class hierarchy for
>> > > > HAServices and eliminate code duplication.
>> > > >
>> > > > 2. While FLINK-24038 does eliminate the leader election time cost
>> for
>> > > > each job, it still involves creating a znode or writing to the
>> > > > configmap for each job, which can negatively impact performance
>> under
>> > > > higher workloads. This also applies to all other persistence
>> services
>> > > > such as checkpoint and blob storage except for the job graph store.
>> > > >
>> > > > WDYT?
>> > > >
>> > > > [1]
>> > > >
>> > >
>> https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054
>> > > >
>> > > > Best,
>> > > > Yangze Guo
>> > > >
>> > > > On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu <reed...@gmail.com> wrote:
>> > > > >
>> > > > > Thanks for creating the FLIP and starting the discussion, Yangze.
>> It
>> > > > makes
>> > > > > sense to me to improve the job submission performance in OLAP
>> > > scenarios.
>> > > > >
>> > > > > I have a few questions regarding the proposed changes:
>> > > > >
>> > > > > 1. How about skipping the job graph persistence if the proposed
>> config
>> > > > > 'high-availability.enable-job-recovery' is set to false? In this
>> way,
>> > > > > we do not need to do the refactoring work.
>> > > > >
>> > > > > 2. Instead of using different HA services for Dispatcher and
>> JobMaster.
>> > > > > Can we leverage the work of FLINK-24038 to eliminate the leader
>> > > election
>> > > > > time cost of each job? Honestly I had thought it was already the
>> truth
>> > > > but
>> > > > > seems it is not. This improvement can also benefit non-OLAP jobs.
>> > > > >
>> > > > > Thanks,
>> > > > > Zhu
>> > > > >
>> > > > > Yangze Guo <karma...@gmail.com> 于2024年1月8日周一 17:11写道:
>> > > > >
>> > > > > > Thanks for the pointer, Rui!
>> > > > > >
>> > > > > > I have reviewed FLIP-383, and based on my understanding, this
>> feature
>> > > > > > should be enabled by default for batch jobs in the future.
>> Therefore,
>> > > > > > +1 for checking the parameters and issuing log warnings when
>> the user
>> > > > > > explicitly configures execution.batch.job-recovery.enabled to
>> true.
>> > > > > >
>> > > > > > +1 for high-availability.job-recovery.enabled, which would be
>> more
>> > > > > > suitable with YAML hierarchy.
>> > > > > >
>> > > > > >
>> > > > > > Best,
>> > > > > > Yangze Guo
>> > > > > >
>> > > > > > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com>
>> wrote:
>> > > > > > >
>> > > > > > > Thanks to Yangze driving this proposal!
>> > > > > > >
>> > > > > > > Overall looks good to me! This proposal is useful for
>> > > > > > > the performance when the job doesn't need the failover.
>> > > > > > >
>> > > > > > > I have some minor questions:
>> > > > > > >
>> > > > > > > 1. How does it work with FLIP-383[1]?
>> > > > > > >
>> > > > > > > This FLIP introduces a high-availability.enable-job-recovery,
>> > > > > > > and FLIP-383 introduces a
>> execution.batch.job-recovery.enabled.
>> > > > > > >
>> > > > > > > IIUC, when high-availability.enable-job-recovery is false,
>> the job
>> > > > > > > cannot recover even if execution.batch.job-recovery.enabled =
>> true,
>> > > > > > > right?
>> > > > > > >
>> > > > > > > If so, could we check some parameters and warn some logs? Or
>> > > > > > > disable the execution.batch.job-recovery.enabled directly when
>> > > > > > > high-availability.enable-job-recovery = false.
>> > > > > > >
>> > > > > > > 2. Could we rename it to
>> high-availability.job-recovery.enabled to
>> > > > unify
>> > > > > > > the naming?
>> > > > > > >
>> > > > > > > WDYT?
>> > > > > > >
>> > > > > > > [1] https://cwiki.apache.org/confluence/x/QwqZE
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Rui
>> > > > > > >
>> > > > > > > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo <karma...@gmail.com
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Thanks for your comment, Yong.
>> > > > > > > >
>> > > > > > > > Here are my thoughts on the splitting of
>> HighAvailableServices:
>> > > > > > > > Firstly, I would treat this separation as a result of
>> technical
>> > > > debt
>> > > > > > > > and a side effect of the FLIP. In order to achieve a cleaner
>> > > > interface
>> > > > > > > > hierarchy for High Availability before Flink 2.0, the design
>> > > > decision
>> > > > > > > > should not be limited to OLAP scenarios.
>> > > > > > > > I agree that the current HAServices can be divided based on
>> > > either
>> > > > the
>> > > > > > > > actual target (cluster & job) or the type of functionality
>> > > (leader
>> > > > > > > > election & persistence). From a conceptual perspective, I
>> do not
>> > > > see
>> > > > > > > > one approach being better than the other. However, I have
>> chosen
>> > > > the
>> > > > > > > > current separation for a clear separation of concerns. After
>> > > > FLIP-285,
>> > > > > > > > each process has a dedicated LeaderElectionService
>> responsible
>> > > for
>> > > > > > > > leader election of all the components within it. This
>> > > > > > > > LeaderElectionService has its own lifecycle management. If
>> we
>> > > were
>> > > > to
>> > > > > > > > split the HAServices into 'ClusterHighAvailabilityService'
>> and
>> > > > > > > > 'JobHighAvailabilityService', we would need to couple the
>> > > lifecycle
>> > > > > > > > management of these two interfaces, as they both rely on the
>> > > > > > > > LeaderElectionService and other relevant classes. This
>> coupling
>> > > and
>> > > > > > > > implicit design assumption will increase the complexity and
>> > > testing
>> > > > > > > > difficulty of the system. WDYT?
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Yangze Guo
>> > > > > > > >
>> > > > > > > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang <
>> zjur...@gmail.com>
>> > > > wrote:
>> > > > > > > > >
>> > > > > > > > > Thanks Yangze for starting this discussion. I have one
>> comment:
>> > > > why
>> > > > > > do we
>> > > > > > > > > need to abstract two services as `LeaderServices` and
>> > > > > > > > > `PersistenceServices`?
>> > > > > > > > >
>> > > > > > > > > From the content, the purpose of this FLIP is to make job
>> > > > failover
>> > > > > > more
>> > > > > > > > > lightweight, so it would be more appropriate to abstract
>> two
>> > > > > > services as
>> > > > > > > > > `ClusterHighAvailabilityService` and
>> > > `JobHighAvailabilityService`
>> > > > > > instead
>> > > > > > > > > of `LeaderServices` and `PersistenceServices` based on
>> leader
>> > > and
>> > > > > > store.
>> > > > > > > > In
>> > > > > > > > > this way, we can create a `JobHighAvailabilityService`
>> that
>> > > has a
>> > > > > > leader
>> > > > > > > > > service and store for the job that meets the requirements
>> based
>> > > > on
>> > > > > > the
>> > > > > > > > > configuration in the zk/k8s high availability service.
>> > > > > > > > >
>> > > > > > > > > WDYT?
>> > > > > > > > >
>> > > > > > > > > Best,
>> > > > > > > > > Fang Yong
>> > > > > > > > >
>> > > > > > > > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng <
>> > > > xiangyu...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Thanks Yangze for restart this discussion.
>> > > > > > > > > >
>> > > > > > > > > > +1 for the overall idea. By splitting the
>> > > > HighAvailabilityServices
>> > > > > > into
>> > > > > > > > > > LeaderServices and PersistenceServices, we may support
>> > > > configuring
>> > > > > > > > > > different storage behind them in the future.
>> > > > > > > > > >
>> > > > > > > > > > We did run into real problems in production where too
>> much
>> > > job
>> > > > > > > > metadata was
>> > > > > > > > > > being stored on ZK, causing system instability.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2023年12月29日周五 10:21写道:
>> > > > > > > > > >
>> > > > > > > > > > > Thanks for the response, Zhanghao.
>> > > > > > > > > > >
>> > > > > > > > > > > PersistenceServices sounds good to me.
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Yangze Guo
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
>> > > > > > > > > > > <zhanghao.c...@outlook.com> wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for driving this effort, Yangze! The proposal
>> > > > overall
>> > > > > > LGTM.
>> > > > > > > > > > Other
>> > > > > > > > > > > from the throughput enhancement in the OLAP scenario,
>> the
>> > > > > > separation
>> > > > > > > > of
>> > > > > > > > > > > leader election/discovery services and the metadata
>> > > > persistence
>> > > > > > > > services
>> > > > > > > > > > > will also make the HA impl clearer and easier to
>> maintain.
>> > > > Just a
>> > > > > > > > minor
>> > > > > > > > > > > comment on naming: would it better to rename
>> > > > PersistentServices
>> > > > > > to
>> > > > > > > > > > > PersistenceServices, as usually we put a noun before
>> > > > Services?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best,
>> > > > > > > > > > > > Zhanghao Chen
>> > > > > > > > > > > > ________________________________
>> > > > > > > > > > > > From: Yangze Guo <karma...@gmail.com>
>> > > > > > > > > > > > Sent: Tuesday, December 19, 2023 17:33
>> > > > > > > > > > > > To: dev <dev@flink.apache.org>
>> > > > > > > > > > > > Subject: [DISCUSS] FLIP-403: High Availability
>> Services
>> > > for
>> > > > > > OLAP
>> > > > > > > > > > > Scenarios
>> > > > > > > > > > > >
>> > > > > > > > > > > > Hi, there,
>> > > > > > > > > > > >
>> > > > > > > > > > > > We would like to start a discussion thread on
>> "FLIP-403:
>> > > > High
>> > > > > > > > > > > > Availability Services for OLAP Scenarios"[1].
>> > > > > > > > > > > >
>> > > > > > > > > > > > Currently, Flink's high availability service
>> consists of
>> > > > two
>> > > > > > > > > > > > mechanisms: leader election/retrieval services for
>> > > > JobManager
>> > > > > > and
>> > > > > > > > > > > > persistent services for job metadata. However, these
>> > > > > > mechanisms are
>> > > > > > > > > > > > set up in an "all or nothing" manner. In OLAP
>> scenarios,
>> > > we
>> > > > > > > > typically
>> > > > > > > > > > > > only require leader election/retrieval services for
>> > > > JobManager
>> > > > > > > > > > > > components since jobs usually do not have a restart
>> > > > strategy.
>> > > > > > > > > > > > Additionally, the persistence of job states can
>> > > negatively
>> > > > > > impact
>> > > > > > > > the
>> > > > > > > > > > > > cluster's throughput, especially for short query
>> jobs.
>> > > > > > > > > > > >
>> > > > > > > > > > > > To address these issues, this FLIP proposes
>> splitting the
>> > > > > > > > > > > > HighAvailabilityServices into LeaderServices and
>> > > > > > > > PersistentServices,
>> > > > > > > > > > > > and enable users to independently configure the high
>> > > > > > availability
>> > > > > > > > > > > > strategies specifically related to jobs.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Please find more details in the FLIP wiki document
>> [1].
>> > > > Looking
>> > > > > > > > > > > > forward to your feedback.
>> > > > > > > > > > > >
>> > > > > > > > > > > > [1]
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best,
>> > > > > > > > > > > > Yangze Guo
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > >
>> > >
>>
>

Reply via email to