@Yangze

> (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
100% increase in QPS

Did you look into which part takes most of the time? Jar uploading, Jar
downloading, JobInformation shipping, TDD shipping, or others?

If these objects are large, e.g. a hundreds megabytes connector jar,
will ship it hundreds of times(if parallelism > 100) from JMs to TMs
be a blocker of performance and stability, compared letting the DFS
help with the shipping. If yes, we should not force it to use a void
blobService. Maybe an option should be given to users to switch between
blobServices?

I'm fine to use a void blobService in OLAP scenarios if it works better
in most cases. However, it is a bit weird that we disable blobs if
`enable-job-recovery=false`. Conceptually, they should be unrelated.

> As Matthias mentioned, each component still needs to write its RPC
address, so this part of the writing may be unavoidable.

Thanks Matthias for the inputs.
However, even in non-ha mode, that task manager can connect to JobMaster.
Therefore, I guess it's not necessary to store JM addresses externally.
I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
for TMs to find out the addresses of JMs via external services?

> focus on the discussion of HA functionality in the OLAP scenario in
FLIP-403 and exclude the refactoring from the scope of this FLIP

It sounds good to me.
Actually the concept of separating leader election and persistence
looks great to me at the first glance. But the shared MaterialProvider
makes it more complicated than I had expected.

Thanks,
Zhu

Yangze Guo <karma...@gmail.com> 于2024年1月11日周四 14:53写道:

> Thanks for the comments, Zhu and Matthias.
>
> @Zhu Zhu
>
> > How about disabling the checkpoint to avoid the cost? I know the cost is
> there even if we disable the checkpoint at the moment. But I think it can
> be fixed.
> > If HA is disabled, the jobmanager needs to directly participate in all
> blob shipping work which may result in a hot-spot.
>
> Currently, there are several persistence services that have specific
> implementations based on the HA mode:
> - JobGraphStore and JobResultStore: These are related to job recovery
> and can cause significant redundant I/O in OLAP scenarios, impacting
> performance. It may be necessary to configure them as in-memory stores
> for OLAP.
> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> overhead by disabling checkpoints. I agree to remove Checkpoint
> Storage from the scope of this FLIP.
> - BlobStore: Agree that disabling BlobStore can potentially lead to
> hotspots in JobManagers. However, enabling it in OLAP scenarios can
> also result in high external storage access overhead , e.g.
> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
> In our internal benchmark for short query (with 128 parallelism
> WordCount jobs), disabling BlobStore resulted in a 100% increase in
> QPS. Therefore, I lean towards disabling it. WDYT?
>
> > FLINK-24038
>
> As Matthias mentioned, each component still needs to write its RPC
> address, so this part of the writing may be unavoidable.
>
> @Zhu Zhu @Matthias
>
> > I don't see why the PersistenceServices needs to have access to the
> MaterialProvider. I feel like there shouldn't be a component that's shared
> between the LeaderElectionService and the PersistenceServices.
> > The corresponding ZooKeeper/k8s implementation would hold the client
> instance (which is the only thing that should be shared between the
> LeaderElectionService and the PersistenceServices implementations).
>
> Yes, I agree that this is the goal of splitting the interfaces.
> However, when I attempted to split it, I found that these two services
> still have implicit temporal dependencies, such as the closure of the
> client instance and the cleanup of services and job data.
>
> Regards the refactoring of HighAvailabilityServices, I try to
> summarize the following issues that need to be considered:
> - Splitting LeaderServices and PersistenceServices; As Matthias
> mentioned, this allows for easier testing.
> - Removal of deprecated interfaces, such as
> getWebMonitorLeaderElectionService.
> - Reviewing existing multiple close and cleanup interfaces.
> - Integration of StandaloneHaServices and EmbeddedHaServices.
> I think this topic might be big enough to have a separate discussion
> thread. I am now inclined to focus on the discussion of HA
> functionality in the OLAP scenario in FLIP-403 and exclude the
> refactoring from the scope of this FLIP. This way, we can simply
> return different persistence services in AbstractHaServices based on
> the configuration. And I'm willing to file a new FLIP (or perhaps a
> ticket would be sufficient) for the refactoring of HA. WDYT?
>
>
> Best,
> Yangze Guo
>
> On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl
> <matthias.p...@aiven.io.invalid> wrote:
> >
> > Thanks for joining the discussion, everyone and sorry for picking it up
> > that late. Here are a few points, I want to add to this discussion:
> >
> > - FLINK-24038 [1] led to a reduction of the curator/k8s client leader
> > election requests by having a single leader election per JM rather than
> > individual once per RPCEndpoint. We still need to have one record per
> > component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...),
> > though, because we need to save the address for RPC calls (Akka/Pekko)
> per
> > component (each JobMaster has its own RPC endpoint with a dedicated
> port).
> > That is why we cannot get rid of the individual entries/znodes per job.
> >
> > - An alternative for this FLIP's proposal would be to stick to the
> current
> > HighAvailabilityServices interface. We could come up with a new
> > implementation that does provide Standalone instances of what you call
> > PersistentServices in this FLIP. That would reduce the efforts that come
> > with refactoring the HighAvailabilityServices interface. It should be
> > discussed here as an alternative and probably mentioned in the FLIP as a
> > rejected alternative if the community agrees.
> >
> > - From a conceptual point of view, splitting the HighAvailabilityServices
> > into LeaderElectionService and PersistentServices (I'm wondering whether
> > something like JobHighAvailabilityServices would be more descriptive
> here.
> > The word "persistence" is a bit ambiguous and can also be used in
> scenarios
> > other than HA) makes sense in my opinion. One hint why separating this
> big
> > interface HighAvailabilityServices into two smaller interfaces would make
> > sense is the fact that there is a test
> > implementation EmbeddedHaServicesWithLeadershipControl right now that
> > provides embedded HA with helper methods to control the LeaderElection in
> > ITCases. It is a workaround to get access to leader election. With two
> > separate interfaces, we could make it easier to test these things.
> >
> > - I'm not too sure about the proposed class hierarchy of FLIP-403:
> >   - What are the semantics of the "MaterialProvider". The name doesn't
> give
> > me any hints on the interface/class purpose. There could be some
> > description for this component being added to the FLIP. But on another
> > note: I don't see why the PersistenceServices needs to have access to the
> > MaterialProvider. I feel like there shouldn't be a component that's
> shared
> > between the LeaderElectionService and the PersistenceServices.
> >   - Alternative: What about coming up with a factory interface
> > HighAvailabilityServicesFactory which provides two methods:
> > createLeaderElectionService & createPersistenceServices. The factory
> > wouldn't need to keep any instances (as suggested by this FLIP's
> > HighAvailabilityServices component. It's a plain factory component that
> > creates instances. The corresponding ZooKeeper/k8s implementation would
> > hold the client instance (which is the only thing that should be shared
> > between the LeaderElectionService and the PersistenceServices
> > implementations). The factory would live in the ClusterEntrypoint. Any
> > cleanup of HA data would be covered by the
> > LeaderElection|PersistenceServices, individually.
> >
> > Looking forward to your opinions.
> > Best,
> > Matthias
> >
> > On Tue, Jan 9, 2024 at 1:23 PM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > > I would treat refactoring as a technical debt...
> > >
> > > Sorry I don't quite get the needs of the refactoring work.
> > >
> > > The refactoring work brings benefits if there are requirements to
> combine
> > > different leader election services and persistence services.
> > > The answer in this FLIP is to combine DefaultLeaderServices and
> > > EmbeddedPersistenceServices. But I'm concerned that, if the goal is to
> > > avoid the cost of job recovery, disable the persistence of the overall
> > > cluster might be an overkill. e.g. if later we want the cluster
> partitions
> > > to be recovered after JM failover?
> > >
> > > Yet I do not think of the needs of other new combinations at the
> moment,
> > > e.g. a non-HA leader election service with an HA persistence service,
> > > a ZK leader election service with a K8s persistence service. Maybe you
> > > have some good cases for it?
> > >
> > > TBH, the current class structure looks simpler to me. I'm also
> wondering
> > > whether it's possible to merge StandaloneHaServices with
> > > EmbeddedHaServices,
> > > because the latter one is a special case(all components in the same
> > > process)
> > > of the former one.
> > >
> > > > it still involves creating a znode or writing to the configmap
> > > for each job
> > >
> > > Is it possible to avoid the cost? My gut feeling is that these actions
> > > are not necessary after Flink does leader election for the overall
> master
> > > process.
> > >
> > > > such as checkpoint and blob storage except for the job graph store
> > >
> > > How about disabling the checkpoint to avoid the cost? I know the cost
> is
> > > there
> > > even if we disable the checkpoint at the moment. But I think it can be
> > > fixed.
> > > Checkpoint is not needed if job recovery is not needed, the concepts
> are
> > > highly related.
> > >
> > > Regarding blob storage, I'm not sure whether it's good to disable HA
> for
> > > it.
> > > If HA is disabled, the jobmanager needs to directly participate in all
> blob
> > > shipping work which may result in a hot-spot.
> > >
> > > WDYT?
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Yangze Guo <karma...@gmail.com> 于2024年1月9日周二 10:55写道:
> > >
> > > > Thank you for your comments, Zhu!
> > > >
> > > > 1. I would treat refactoring as a technical debt and a side effect of
> > > > this FLIP. The idea is inspired by Matthias' comments in [1]. It
> > > > suggests having a single implementation of HighAvailabilityServices
> > > > that requires a factory method for persistence services and leader
> > > > services. After this, we will achieve a clearer class hierarchy for
> > > > HAServices and eliminate code duplication.
> > > >
> > > > 2. While FLINK-24038 does eliminate the leader election time cost for
> > > > each job, it still involves creating a znode or writing to the
> > > > configmap for each job, which can negatively impact performance under
> > > > higher workloads. This also applies to all other persistence services
> > > > such as checkpoint and blob storage except for the job graph store.
> > > >
> > > > WDYT?
> > > >
> > > > [1]
> > > >
> > >
> https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > >
> > > > > Thanks for creating the FLIP and starting the discussion, Yangze.
> It
> > > > makes
> > > > > sense to me to improve the job submission performance in OLAP
> > > scenarios.
> > > > >
> > > > > I have a few questions regarding the proposed changes:
> > > > >
> > > > > 1. How about skipping the job graph persistence if the proposed
> config
> > > > > 'high-availability.enable-job-recovery' is set to false? In this
> way,
> > > > > we do not need to do the refactoring work.
> > > > >
> > > > > 2. Instead of using different HA services for Dispatcher and
> JobMaster.
> > > > > Can we leverage the work of FLINK-24038 to eliminate the leader
> > > election
> > > > > time cost of each job? Honestly I had thought it was already the
> truth
> > > > but
> > > > > seems it is not. This improvement can also benefit non-OLAP jobs.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Yangze Guo <karma...@gmail.com> 于2024年1月8日周一 17:11写道:
> > > > >
> > > > > > Thanks for the pointer, Rui!
> > > > > >
> > > > > > I have reviewed FLIP-383, and based on my understanding, this
> feature
> > > > > > should be enabled by default for batch jobs in the future.
> Therefore,
> > > > > > +1 for checking the parameters and issuing log warnings when the
> user
> > > > > > explicitly configures execution.batch.job-recovery.enabled to
> true.
> > > > > >
> > > > > > +1 for high-availability.job-recovery.enabled, which would be
> more
> > > > > > suitable with YAML hierarchy.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com>
> wrote:
> > > > > > >
> > > > > > > Thanks to Yangze driving this proposal!
> > > > > > >
> > > > > > > Overall looks good to me! This proposal is useful for
> > > > > > > the performance when the job doesn't need the failover.
> > > > > > >
> > > > > > > I have some minor questions:
> > > > > > >
> > > > > > > 1. How does it work with FLIP-383[1]?
> > > > > > >
> > > > > > > This FLIP introduces a high-availability.enable-job-recovery,
> > > > > > > and FLIP-383 introduces a execution.batch.job-recovery.enabled.
> > > > > > >
> > > > > > > IIUC, when high-availability.enable-job-recovery is false, the
> job
> > > > > > > cannot recover even if execution.batch.job-recovery.enabled =
> true,
> > > > > > > right?
> > > > > > >
> > > > > > > If so, could we check some parameters and warn some logs? Or
> > > > > > > disable the execution.batch.job-recovery.enabled directly when
> > > > > > > high-availability.enable-job-recovery = false.
> > > > > > >
> > > > > > > 2. Could we rename it to
> high-availability.job-recovery.enabled to
> > > > unify
> > > > > > > the naming?
> > > > > > >
> > > > > > > WDYT?
> > > > > > >
> > > > > > > [1] https://cwiki.apache.org/confluence/x/QwqZE
> > > > > > >
> > > > > > > Best,
> > > > > > > Rui
> > > > > > >
> > > > > > > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo <karma...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Thanks for your comment, Yong.
> > > > > > > >
> > > > > > > > Here are my thoughts on the splitting of
> HighAvailableServices:
> > > > > > > > Firstly, I would treat this separation as a result of
> technical
> > > > debt
> > > > > > > > and a side effect of the FLIP. In order to achieve a cleaner
> > > > interface
> > > > > > > > hierarchy for High Availability before Flink 2.0, the design
> > > > decision
> > > > > > > > should not be limited to OLAP scenarios.
> > > > > > > > I agree that the current HAServices can be divided based on
> > > either
> > > > the
> > > > > > > > actual target (cluster & job) or the type of functionality
> > > (leader
> > > > > > > > election & persistence). From a conceptual perspective, I do
> not
> > > > see
> > > > > > > > one approach being better than the other. However, I have
> chosen
> > > > the
> > > > > > > > current separation for a clear separation of concerns. After
> > > > FLIP-285,
> > > > > > > > each process has a dedicated LeaderElectionService
> responsible
> > > for
> > > > > > > > leader election of all the components within it. This
> > > > > > > > LeaderElectionService has its own lifecycle management. If we
> > > were
> > > > to
> > > > > > > > split the HAServices into 'ClusterHighAvailabilityService'
> and
> > > > > > > > 'JobHighAvailabilityService', we would need to couple the
> > > lifecycle
> > > > > > > > management of these two interfaces, as they both rely on the
> > > > > > > > LeaderElectionService and other relevant classes. This
> coupling
> > > and
> > > > > > > > implicit design assumption will increase the complexity and
> > > testing
> > > > > > > > difficulty of the system. WDYT?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang <zjur...@gmail.com
> >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks Yangze for starting this discussion. I have one
> comment:
> > > > why
> > > > > > do we
> > > > > > > > > need to abstract two services as `LeaderServices` and
> > > > > > > > > `PersistenceServices`?
> > > > > > > > >
> > > > > > > > > From the content, the purpose of this FLIP is to make job
> > > > failover
> > > > > > more
> > > > > > > > > lightweight, so it would be more appropriate to abstract
> two
> > > > > > services as
> > > > > > > > > `ClusterHighAvailabilityService` and
> > > `JobHighAvailabilityService`
> > > > > > instead
> > > > > > > > > of `LeaderServices` and `PersistenceServices` based on
> leader
> > > and
> > > > > > store.
> > > > > > > > In
> > > > > > > > > this way, we can create a `JobHighAvailabilityService` that
> > > has a
> > > > > > leader
> > > > > > > > > service and store for the job that meets the requirements
> based
> > > > on
> > > > > > the
> > > > > > > > > configuration in the zk/k8s high availability service.
> > > > > > > > >
> > > > > > > > > WDYT?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Fang Yong
> > > > > > > > >
> > > > > > > > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng <
> > > > xiangyu...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Yangze for restart this discussion.
> > > > > > > > > >
> > > > > > > > > > +1 for the overall idea. By splitting the
> > > > HighAvailabilityServices
> > > > > > into
> > > > > > > > > > LeaderServices and PersistenceServices, we may support
> > > > configuring
> > > > > > > > > > different storage behind them in the future.
> > > > > > > > > >
> > > > > > > > > > We did run into real problems in production where too
> much
> > > job
> > > > > > > > metadata was
> > > > > > > > > > being stored on ZK, causing system instability.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2023年12月29日周五 10:21写道:
> > > > > > > > > >
> > > > > > > > > > > Thanks for the response, Zhanghao.
> > > > > > > > > > >
> > > > > > > > > > > PersistenceServices sounds good to me.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yangze Guo
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > > > > > > > > > > <zhanghao.c...@outlook.com> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for driving this effort, Yangze! The proposal
> > > > overall
> > > > > > LGTM.
> > > > > > > > > > Other
> > > > > > > > > > > from the throughput enhancement in the OLAP scenario,
> the
> > > > > > separation
> > > > > > > > of
> > > > > > > > > > > leader election/discovery services and the metadata
> > > > persistence
> > > > > > > > services
> > > > > > > > > > > will also make the HA impl clearer and easier to
> maintain.
> > > > Just a
> > > > > > > > minor
> > > > > > > > > > > comment on naming: would it better to rename
> > > > PersistentServices
> > > > > > to
> > > > > > > > > > > PersistenceServices, as usually we put a noun before
> > > > Services?
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Zhanghao Chen
> > > > > > > > > > > > ________________________________
> > > > > > > > > > > > From: Yangze Guo <karma...@gmail.com>
> > > > > > > > > > > > Sent: Tuesday, December 19, 2023 17:33
> > > > > > > > > > > > To: dev <dev@flink.apache.org>
> > > > > > > > > > > > Subject: [DISCUSS] FLIP-403: High Availability
> Services
> > > for
> > > > > > OLAP
> > > > > > > > > > > Scenarios
> > > > > > > > > > > >
> > > > > > > > > > > > Hi, there,
> > > > > > > > > > > >
> > > > > > > > > > > > We would like to start a discussion thread on
> "FLIP-403:
> > > > High
> > > > > > > > > > > > Availability Services for OLAP Scenarios"[1].
> > > > > > > > > > > >
> > > > > > > > > > > > Currently, Flink's high availability service
> consists of
> > > > two
> > > > > > > > > > > > mechanisms: leader election/retrieval services for
> > > > JobManager
> > > > > > and
> > > > > > > > > > > > persistent services for job metadata. However, these
> > > > > > mechanisms are
> > > > > > > > > > > > set up in an "all or nothing" manner. In OLAP
> scenarios,
> > > we
> > > > > > > > typically
> > > > > > > > > > > > only require leader election/retrieval services for
> > > > JobManager
> > > > > > > > > > > > components since jobs usually do not have a restart
> > > > strategy.
> > > > > > > > > > > > Additionally, the persistence of job states can
> > > negatively
> > > > > > impact
> > > > > > > > the
> > > > > > > > > > > > cluster's throughput, especially for short query
> jobs.
> > > > > > > > > > > >
> > > > > > > > > > > > To address these issues, this FLIP proposes
> splitting the
> > > > > > > > > > > > HighAvailabilityServices into LeaderServices and
> > > > > > > > PersistentServices,
> > > > > > > > > > > > and enable users to independently configure the high
> > > > > > availability
> > > > > > > > > > > > strategies specifically related to jobs.
> > > > > > > > > > > >
> > > > > > > > > > > > Please find more details in the FLIP wiki document
> [1].
> > > > Looking
> > > > > > > > > > > > forward to your feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Yangze Guo
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > >
>

Reply via email to