Thanks for starting this discussion.
Here are some of my thoughts regarding the proposal and discussions
above.

*+1 to enable ShuffleMaster to stop track partitions proactively*
In production we have encountered problems that it needs *hours* to
recover from a remote shuffle worker lost problem. Because the lost
finished partitions cannot be detected and reproduced all at once.
This improvement can help to solve this problem.


*+1 to make ShuffleMaster a cluster level component*
This helps to avoid maintain multiple clients and connections to the
same remote shuffle service. It also makes it possible to support
external cluster partitions in the future.

*+1 to enable ShuffleMaster to notify master about its internal *
*non-recoverable error*
The scheduler can keep failing or hang due to ShuffleMaster internal
non-recoverable error. Currently this kind of problem cannot be
auto-recovered and are hard to diagnose.
One question which might be out of the scope is that whether we
should do similar things for ShuffleEnvironment?


*+1 that the abstraction should be able to support different jobs to*
*use different ShuffleServices eventually*
I think the proposal does not conflict with this target.
One idea in my mind is to maintain multiple different ShuffleServices
in the Dispatcher/JobManagerSharedServices and let them be shared
between different jobs. Each job should be configured with a key which
points to a ShuffleService. The key should be used by both the scheduler
and tasks on task managers to select their respective
ShuffleMaster/ShuffleEnvironment. This will need work both on the master
and on the worker. Currently the worker will launch a ShuffleEnvironment
shared between different tasks which can be from different jobs. But only
one single ShuffleEnvironment will be created on each task manager.

Thanks,
Zhu

Yingjie Cao <kevin.ying...@gmail.com> 于2021年7月8日周四 上午11:43写道:

> Hi,
>
> Thanks for the reply.
>
> @Guowei
> I agree that we can move forward step by step and start from the most
> important part. Apart from the two points mentioned in your reply,
> initializing and shutting down some external resources gracefully is also
> important which is a reason for the open/close method.
> About the cluster partitions and the ShuffleMasterContext, I agree that we
> can postpone handling the cluster partitions because we need to do more to
> support it, for ShuffleMasterContext, I think we still need it even we do
> not support the cluster partitions in the first step. Currently, the
> shuffle master can only access the cluster configuration, except that, I
> think we also need need the ability of handling the fatal errors occurring
> in the  ShuffleMaster gracefully by propagate the errors to the framework.
> By introducing the ShuffleMasterContext, we can give ShuffleMaster the
> ability to access both the  cluster configuration and the fatal error
> handler. Instead of passing these components directly to the ShuffleMaster,
> a ShuffleMasterContext interface can keep compatibility easily in the
> future. Even we add some new method in the future, we can offer default
> empty implementation in the interface which can keep compatibility.
> About the JobShuffleContext::getConfiguration/listPartitions methods, I
> agree that we can remove them in the first step and we can add them back
> latter. As mentioned above, we can easily keep compatibility based on the
> Context interface.
>
> @Till
> I totally agree that we should support that different jobs use different
> shuffle services and the proposed solution will support this use case
> eventually.
>
> Best,
> Yingjie
>
> Till Rohrmann <trohrm...@apache.org> 于2021年7月7日周三 下午8:15写道:
>
> > One quick comment: When developing the ShuffleService abstraction we also
> > thought that different jobs might want to use different ShuffleServices
> > depending on their workload (e.g. batch vs. streaming workload). So
> > ideally, the chosen solution here can also support this use case
> > eventually.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <guowei....@gmail.com> wrote:
> >
> > > Hi,
> > > Thank Yingjie for initiating this discussion. What I understand that
> the
> > > document[1] actually mainly discusses two issues:
> > > 1. ShuffleMaster should be at the cluster level instead of the job
> level
> > > 2. ShuffleMaster should notify PartitionTracker that some data has been
> > > lost
> > >
> > > Relatively speaking, I think the second problem is more serious.
> Because
> > > for external or remote batch shuffling services, after the machine
> > storing
> > > shuffled data goes offline, PartitionTracker needs to be notified in
> time
> > > to avoid repeated failures of the job. Therefore, it is hoped that when
> > > shuffle data goes offline due to a machine error, ShuffleMaster can
> > notify
> > > the PartitionTracker in time. This requires ShuffleMaster to notify the
> > > PartitionTracker with a handle such as JobShuffleContext.
> > >
> > > So how to pass JobShuffleContext to ShuffleMaster? There are two
> options:
> > > 1. After ShuffleMaster is created, pass JobShuffleContext to
> > ShuffleMaster,
> > > such as ShuffleMaster::register(JobShuffleContext)
> > > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> > >
> > > Which one to choose is actually related to issue 1. Because if
> > > ShuffleMaster is a cluster level, you should choose option 1,
> otherwise,
> > > choose option 2. I think ShuffleMaster should be at the cluster level,
> > for
> > > example, because we don't need to maintain a ShuffleMaster for each job
> > in
> > > a SessionCluster; in addition, this ShuffleMaster should also be used
> by
> > > RM's PartitionTracker in the future. Therefore, I think Option 1 is
> more
> > > appropriate.
> > >
> > > To sum up, we may give priority to solving problem 2, while taking into
> > > account that ShuffleMaster should be a cluster-level component.
> > Therefore,
> > > I think we could ignore the date ShuffleMasterContext at the beginning;
> > at
> > > the same time, JobShuffleContext::getConfiguration/listPartitions
> should
> > > not be needed.
> > >
> > > [1]
> > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <kevin.ying...@gmail.com>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and
> > its
> > > > Relationship with JobMaster and PartitionTracker". (These are things
> we
> > > > found when moving our external shuffle to the pluggable shuffle
> service
> > > > framework.)
> > > >
> > > > The mail client may fail to display the right format. If so, please
> > refer
> > > > to this document:
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > > > .
> > > > Lifecycle of ShuffleMaster
> > > >
> > > > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > > > ShuffleServiceFactory is loaded for each JobMaster instance and then
> > > > ShuffleServiceFactory#createShuffleMaster will be called to create a
> > > > ShuffleMaser instance. However, the default
> NettyShuffleServiceFactory
> > > > always returns the same ShuffleMaser singleton instance for all jobs.
> > > Based
> > > > on the current implementation, the lifecycle of ShuffleMaster seems
> > open
> > > > and depends on the shuffle plugin themselves. However, at the TM
> side,
> > > > the ShuffleEnvironment
> > > > is a part of the TaskManagerServices whose lifecycle is decoupled
> with
> > > jobs
> > > > which is more like a service. It means there is also an inconsistency
> > > > between the TM side and the JM side.
> > > >
> > > > From my understanding, the reason for this is that the pluggable
> > shuffle
> > > > framework is still not completely finished yet, for example, there
> is a
> > > > follow up umbrella ticket  FLINK-19551
> > > > <https://issues.apache.org/jira/browse/FLINK-19551> for the
> pluggable
> > > > shuffle service framework and in its subtasks, there is one task (
> > > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>)
> which
> > > > aims
> > > > to load shuffle plugin with the PluginManager. I think this can solve
> > the
> > > > issue mentioned above. After the corresponding factory  loaded by the
> > > > PluginManager, all ShuffleMaster instances can be stored in a map
> > indexed
> > > > by the corresponding factory class name  which can be shared by all
> > jobs.
> > > > After that, the ShuffleMaster becomes a cluster level service which
> is
> > > > consistent with the ShuffleEnvironment at the TM side.
> > > >
> > > > As a summary, we propose to finish FLINK-12731
> > > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the
> > shuffle
> > > > service a real cluster level service first. Furthermore, we add two
> > > > lifecycle methods to the ShuffleMaster interface, including start and
> > > > close responding
> > > > for initialization (for example, contacting the external system) and
> > > > graceful shutdown (for example, releasing the resources) respectively
> > > > (these methods already exist in the ShuffleEnvironment interface at
> the
> > > TM
> > > > side). What do you think?
> > > > Relationship of ShuffleMaster & JobMaster
> > > >
> > > > Currently, JobMaster holds the reference to the corresponding
> > > ShuffleMaster
> > > > and it can register partitions (allocate ShuffleDescriptor from) to
> > > > ShuffleMaster
> > > > by the registerPartitionWithProducer method. To support use cases
> like
> > > > allocating external resources when a job starts and releasing all
> > > allocated
> > > > resources when a job terminates, we may also need some job level
> > > > initialization and finalization. These job level initialization and
> > > > finalization are also helpful when serving multiple jobs
> > simultaneously.
> > > >
> > > > As a summary,  we propose to add two job level lifecycle methods
> > > > registerJob
> > > > and unregisterJob responding for job level shuffle initialization and
> > > > finalization, for example, releasing all external resources occupied
> by
> > > the
> > > > corresponding job. What do you think?
> > > > Relationship of ShuffleMaster & PartitionTracker
> > > >
> > > > Currently, the JobMasterPartitionTracker can release external result
> > > > partitions through the releasePartitionExternally method of
> > > ShuffleMaster.
> > > > However, the shuffle plugin (ShuffleMaster) may also need the ability
> > of
> > > > stopping  tracking some partitions depending on the status of the
> > > external
> > > > services, for example, if the external storage node which stores some
> > > > partitions crashes, we need to stop tracking all partitions in it to
> > > avoid
> > > > reproducing the lost partitions one by one. By introducing something
> > like
> > > > ShuffleContext which delegates to the partition tracker, this
> > requirement
> > > > can be easily satisfied. Besides, for cluster partitions, we also
> need
> > to
> > > > have the ability to release them.
> > > >
> > > > As a summary, we propose to add a releaseDataSetExternally method to
> > > > the ShuffleMaster
> > > > interface which is responsible for releasing cluster partitions.
> > Besides,
> > > > we propose to add a ShuffleContext which can delegate to the
> > > > PartitionTracker and stop tracking partitions. For the cluster
> > partitions
> > > > and job partitions, two separated ShuffleContext abstracts are
> needed.
> > > > What do you think?
> > > > Interface Change Summary
> > > >
> > > > As discussed in the above sections, we propose to make some interface
> > > > changes around the ShuffleMaster interface. The first change is to
> > > > pass a ShuffleMasterContex
> > > > instance to the ShuffleServiceFactory when creating the ShuffleMaster
> > > just
> > > > like the ShuffleEnvironment creation at the TM side. Changes are
> marked
> > > > with bold texts (the same below).
> > > >
> > > > public interface ShuffleServiceFactory<
> > > >         SD extends ShuffleDescriptor, P extends
> ResultPartitionWriter,
> > G
> > > > extends IndexedInputGate> {
> > > >
> > > >     /**
> > > >     * Factory method to create a specific {@link ShuffleMaster}
> > > > implementation.
> > > >     */
> > > >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > > > shuffleMasterContext);
> > > >
> > > >     /**
> > > >     * Factory method to create a specific local {@link
> > > ShuffleEnvironment}
> > > > implementation.
> > > >     */
> > > >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> > > >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > > > }
> > > >
> > > > The following  is the ShuffleMasterContext interface. It will be
> > > > implemented by the pluggable shuffle framework itself and can be used
> > by
> > > > the shuffle plugin. A context Interface is more friendly if we want
> to
> > > > extend it in the future.
> > > >
> > > > public interface ShuffleMasterContext {
> > > >
> > > >     /** Gets the cluster configuration. */
> > > >     Configuration getConfiguration();
> > > >
> > > >     /** Handles the fatal error if any. */
> > > >     void onFatalError(Throwable throwable);
> > > >
> > > >     /**
> > > >     * Stops tracking the target dataset (cluster partitions), which
> > means
> > > > these data can not be reused anymore.
> > > >     */
> > > >     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> > > > dataSetID);
> > > >
> > > >     /** Returns IDs of all datasets (cluster partitions) being
> tracked
> > by
> > > > this cluster currently. */
> > > >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > > > }
> > > >
> > > > The second part to be enhanced is the ShuffleMaster interface.
> Methods
> > to
> > > > be added include start, close, registerJob, unregisterJob and
> > > > releaseDataSetExternally. In addition, because each ShuffleMaster
> > > instance
> > > > can serve multiple jobs simultaneously, when registering partitions,
> > one
> > > > should also provide the corresponding JobID. The following shows the
> > > > updated ShuffleMaster interface:
> > > >
> > > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > > > AutoCloseable {
> > > >
> > > >     /**
> > > >     * Starts this shuffle master, for example getting the access and
> > > > connecting to the external
> > > >     * system.
> > > >     */
> > > >     void start() throws Exception;
> > > >
> > > >     /** Closes this shuffle master which releases all resources. */
> > > >     void close() throws Exception;
> > > >
> > > >     /** Registers the target job to this shuffle master. */
> > > >     void registerJob(JobShuffleContext context);
> > > >
> > > >     /** Unregisters the target job from this shuffle master. */
> > > >     void unregisterJob(JobID jobID);
> > > >
> > > >     /** Asynchronously register a partition and its producer with the
> > > > shuffle service. */
> > > >     CompletableFuture<T> registerPartitionWithProducer(
> > > >             JobID jobID,
> > > >             PartitionDescriptor partitionDescriptor,
> > > >             ProducerDescriptor producerDescriptor);
> > > >
> > > >     /** Releases any external resources occupied by the given
> > partition.
> > > */
> > > >     void releasePartitionExternally(ShuffleDescriptor
> > shuffleDescriptor);
> > > >
> > > >     /** Releases the target cluster partitions stored externally if
> > any.
> > > */
> > > >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > > > }
> > > >
> > > > The following  is the JobShuffleContext interface. It will be
> > implemented
> > > > by the pluggable shuffle framework itself and can be used by the
> > shuffle
> > > > plugin.
> > > >
> > > > public interface JobShuffleContext {
> > > >
> > > >     /** Gets the corresponding job configuration. */
> > > >     Configuration getConfiguration();
> > > >
> > > >     /** Gets the corresponding {@link JobID}. */
> > > >     JobID getJobID();
> > > >
> > > >     /**
> > > >     * Stops tracking the target result partitions, which means these
> > > > partitions will be reproduced if used afterwards.
> > > >     */
> > > >     CompletableFuture<Void>
> > > > stopTrackingPartitions(Collection<ResultPartitionID>
> > > > partitionIDS);
> > > >
> > > >     /** Returns information of all partitions being tracked for the
> > > current
> > > > job. */
> > > >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > > > listPartitions();
> > > > }
> > > >
> > > > What do you think of these changes? Any feedback is highly
> appreciated.
> > > >
> > >
> >
>

Reply via email to