Thanks for starting this discussion. Here are some of my thoughts regarding the proposal and discussions above.
*+1 to enable ShuffleMaster to stop track partitions proactively* In production we have encountered problems that it needs *hours* to recover from a remote shuffle worker lost problem. Because the lost finished partitions cannot be detected and reproduced all at once. This improvement can help to solve this problem. *+1 to make ShuffleMaster a cluster level component* This helps to avoid maintain multiple clients and connections to the same remote shuffle service. It also makes it possible to support external cluster partitions in the future. *+1 to enable ShuffleMaster to notify master about its internal * *non-recoverable error* The scheduler can keep failing or hang due to ShuffleMaster internal non-recoverable error. Currently this kind of problem cannot be auto-recovered and are hard to diagnose. One question which might be out of the scope is that whether we should do similar things for ShuffleEnvironment? *+1 that the abstraction should be able to support different jobs to* *use different ShuffleServices eventually* I think the proposal does not conflict with this target. One idea in my mind is to maintain multiple different ShuffleServices in the Dispatcher/JobManagerSharedServices and let them be shared between different jobs. Each job should be configured with a key which points to a ShuffleService. The key should be used by both the scheduler and tasks on task managers to select their respective ShuffleMaster/ShuffleEnvironment. This will need work both on the master and on the worker. Currently the worker will launch a ShuffleEnvironment shared between different tasks which can be from different jobs. But only one single ShuffleEnvironment will be created on each task manager. Thanks, Zhu Yingjie Cao <kevin.ying...@gmail.com> 于2021年7月8日周四 上午11:43写道: > Hi, > > Thanks for the reply. > > @Guowei > I agree that we can move forward step by step and start from the most > important part. Apart from the two points mentioned in your reply, > initializing and shutting down some external resources gracefully is also > important which is a reason for the open/close method. > About the cluster partitions and the ShuffleMasterContext, I agree that we > can postpone handling the cluster partitions because we need to do more to > support it, for ShuffleMasterContext, I think we still need it even we do > not support the cluster partitions in the first step. Currently, the > shuffle master can only access the cluster configuration, except that, I > think we also need need the ability of handling the fatal errors occurring > in the ShuffleMaster gracefully by propagate the errors to the framework. > By introducing the ShuffleMasterContext, we can give ShuffleMaster the > ability to access both the cluster configuration and the fatal error > handler. Instead of passing these components directly to the ShuffleMaster, > a ShuffleMasterContext interface can keep compatibility easily in the > future. Even we add some new method in the future, we can offer default > empty implementation in the interface which can keep compatibility. > About the JobShuffleContext::getConfiguration/listPartitions methods, I > agree that we can remove them in the first step and we can add them back > latter. As mentioned above, we can easily keep compatibility based on the > Context interface. > > @Till > I totally agree that we should support that different jobs use different > shuffle services and the proposed solution will support this use case > eventually. > > Best, > Yingjie > > Till Rohrmann <trohrm...@apache.org> 于2021年7月7日周三 下午8:15写道: > > > One quick comment: When developing the ShuffleService abstraction we also > > thought that different jobs might want to use different ShuffleServices > > depending on their workload (e.g. batch vs. streaming workload). So > > ideally, the chosen solution here can also support this use case > > eventually. > > > > Cheers, > > Till > > > > On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <guowei....@gmail.com> wrote: > > > > > Hi, > > > Thank Yingjie for initiating this discussion. What I understand that > the > > > document[1] actually mainly discusses two issues: > > > 1. ShuffleMaster should be at the cluster level instead of the job > level > > > 2. ShuffleMaster should notify PartitionTracker that some data has been > > > lost > > > > > > Relatively speaking, I think the second problem is more serious. > Because > > > for external or remote batch shuffling services, after the machine > > storing > > > shuffled data goes offline, PartitionTracker needs to be notified in > time > > > to avoid repeated failures of the job. Therefore, it is hoped that when > > > shuffle data goes offline due to a machine error, ShuffleMaster can > > notify > > > the PartitionTracker in time. This requires ShuffleMaster to notify the > > > PartitionTracker with a handle such as JobShuffleContext. > > > > > > So how to pass JobShuffleContext to ShuffleMaster? There are two > options: > > > 1. After ShuffleMaster is created, pass JobShuffleContext to > > ShuffleMaster, > > > such as ShuffleMaster::register(JobShuffleContext) > > > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as > > > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext). > > > > > > Which one to choose is actually related to issue 1. Because if > > > ShuffleMaster is a cluster level, you should choose option 1, > otherwise, > > > choose option 2. I think ShuffleMaster should be at the cluster level, > > for > > > example, because we don't need to maintain a ShuffleMaster for each job > > in > > > a SessionCluster; in addition, this ShuffleMaster should also be used > by > > > RM's PartitionTracker in the future. Therefore, I think Option 1 is > more > > > appropriate. > > > > > > To sum up, we may give priority to solving problem 2, while taking into > > > account that ShuffleMaster should be a cluster-level component. > > Therefore, > > > I think we could ignore the date ShuffleMasterContext at the beginning; > > at > > > the same time, JobShuffleContext::getConfiguration/listPartitions > should > > > not be needed. > > > > > > [1] > > > > > > > > > https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit > > > > > > Best, > > > Guowei > > > > > > > > > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <kevin.ying...@gmail.com> > > > wrote: > > > > > > > Hi devs, > > > > > > > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and > > its > > > > Relationship with JobMaster and PartitionTracker". (These are things > we > > > > found when moving our external shuffle to the pluggable shuffle > service > > > > framework.) > > > > > > > > The mail client may fail to display the right format. If so, please > > refer > > > > to this document: > > > > > > > > > > > > > > https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing > > > > . > > > > Lifecycle of ShuffleMaster > > > > > > > > Currently, the lifecycle of ShuffleMaster seems unclear. The > > > > ShuffleServiceFactory is loaded for each JobMaster instance and then > > > > ShuffleServiceFactory#createShuffleMaster will be called to create a > > > > ShuffleMaser instance. However, the default > NettyShuffleServiceFactory > > > > always returns the same ShuffleMaser singleton instance for all jobs. > > > Based > > > > on the current implementation, the lifecycle of ShuffleMaster seems > > open > > > > and depends on the shuffle plugin themselves. However, at the TM > side, > > > > the ShuffleEnvironment > > > > is a part of the TaskManagerServices whose lifecycle is decoupled > with > > > jobs > > > > which is more like a service. It means there is also an inconsistency > > > > between the TM side and the JM side. > > > > > > > > From my understanding, the reason for this is that the pluggable > > shuffle > > > > framework is still not completely finished yet, for example, there > is a > > > > follow up umbrella ticket FLINK-19551 > > > > <https://issues.apache.org/jira/browse/FLINK-19551> for the > pluggable > > > > shuffle service framework and in its subtasks, there is one task ( > > > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) > which > > > > aims > > > > to load shuffle plugin with the PluginManager. I think this can solve > > the > > > > issue mentioned above. After the corresponding factory loaded by the > > > > PluginManager, all ShuffleMaster instances can be stored in a map > > indexed > > > > by the corresponding factory class name which can be shared by all > > jobs. > > > > After that, the ShuffleMaster becomes a cluster level service which > is > > > > consistent with the ShuffleEnvironment at the TM side. > > > > > > > > As a summary, we propose to finish FLINK-12731 > > > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the > > shuffle > > > > service a real cluster level service first. Furthermore, we add two > > > > lifecycle methods to the ShuffleMaster interface, including start and > > > > close responding > > > > for initialization (for example, contacting the external system) and > > > > graceful shutdown (for example, releasing the resources) respectively > > > > (these methods already exist in the ShuffleEnvironment interface at > the > > > TM > > > > side). What do you think? > > > > Relationship of ShuffleMaster & JobMaster > > > > > > > > Currently, JobMaster holds the reference to the corresponding > > > ShuffleMaster > > > > and it can register partitions (allocate ShuffleDescriptor from) to > > > > ShuffleMaster > > > > by the registerPartitionWithProducer method. To support use cases > like > > > > allocating external resources when a job starts and releasing all > > > allocated > > > > resources when a job terminates, we may also need some job level > > > > initialization and finalization. These job level initialization and > > > > finalization are also helpful when serving multiple jobs > > simultaneously. > > > > > > > > As a summary, we propose to add two job level lifecycle methods > > > > registerJob > > > > and unregisterJob responding for job level shuffle initialization and > > > > finalization, for example, releasing all external resources occupied > by > > > the > > > > corresponding job. What do you think? > > > > Relationship of ShuffleMaster & PartitionTracker > > > > > > > > Currently, the JobMasterPartitionTracker can release external result > > > > partitions through the releasePartitionExternally method of > > > ShuffleMaster. > > > > However, the shuffle plugin (ShuffleMaster) may also need the ability > > of > > > > stopping tracking some partitions depending on the status of the > > > external > > > > services, for example, if the external storage node which stores some > > > > partitions crashes, we need to stop tracking all partitions in it to > > > avoid > > > > reproducing the lost partitions one by one. By introducing something > > like > > > > ShuffleContext which delegates to the partition tracker, this > > requirement > > > > can be easily satisfied. Besides, for cluster partitions, we also > need > > to > > > > have the ability to release them. > > > > > > > > As a summary, we propose to add a releaseDataSetExternally method to > > > > the ShuffleMaster > > > > interface which is responsible for releasing cluster partitions. > > Besides, > > > > we propose to add a ShuffleContext which can delegate to the > > > > PartitionTracker and stop tracking partitions. For the cluster > > partitions > > > > and job partitions, two separated ShuffleContext abstracts are > needed. > > > > What do you think? > > > > Interface Change Summary > > > > > > > > As discussed in the above sections, we propose to make some interface > > > > changes around the ShuffleMaster interface. The first change is to > > > > pass a ShuffleMasterContex > > > > instance to the ShuffleServiceFactory when creating the ShuffleMaster > > > just > > > > like the ShuffleEnvironment creation at the TM side. Changes are > marked > > > > with bold texts (the same below). > > > > > > > > public interface ShuffleServiceFactory< > > > > SD extends ShuffleDescriptor, P extends > ResultPartitionWriter, > > G > > > > extends IndexedInputGate> { > > > > > > > > /** > > > > * Factory method to create a specific {@link ShuffleMaster} > > > > implementation. > > > > */ > > > > ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext > > > > shuffleMasterContext); > > > > > > > > /** > > > > * Factory method to create a specific local {@link > > > ShuffleEnvironment} > > > > implementation. > > > > */ > > > > ShuffleEnvironment<P, G> createShuffleEnvironment( > > > > ShuffleEnvironmentContext shuffleEnvironmentContext); > > > > } > > > > > > > > The following is the ShuffleMasterContext interface. It will be > > > > implemented by the pluggable shuffle framework itself and can be used > > by > > > > the shuffle plugin. A context Interface is more friendly if we want > to > > > > extend it in the future. > > > > > > > > public interface ShuffleMasterContext { > > > > > > > > /** Gets the cluster configuration. */ > > > > Configuration getConfiguration(); > > > > > > > > /** Handles the fatal error if any. */ > > > > void onFatalError(Throwable throwable); > > > > > > > > /** > > > > * Stops tracking the target dataset (cluster partitions), which > > means > > > > these data can not be reused anymore. > > > > */ > > > > CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID > > > > dataSetID); > > > > > > > > /** Returns IDs of all datasets (cluster partitions) being > tracked > > by > > > > this cluster currently. */ > > > > CompletableFuture<List<IntermediateDataSetID>> listDataSets(); > > > > } > > > > > > > > The second part to be enhanced is the ShuffleMaster interface. > Methods > > to > > > > be added include start, close, registerJob, unregisterJob and > > > > releaseDataSetExternally. In addition, because each ShuffleMaster > > > instance > > > > can serve multiple jobs simultaneously, when registering partitions, > > one > > > > should also provide the corresponding JobID. The following shows the > > > > updated ShuffleMaster interface: > > > > > > > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends > > > > AutoCloseable { > > > > > > > > /** > > > > * Starts this shuffle master, for example getting the access and > > > > connecting to the external > > > > * system. > > > > */ > > > > void start() throws Exception; > > > > > > > > /** Closes this shuffle master which releases all resources. */ > > > > void close() throws Exception; > > > > > > > > /** Registers the target job to this shuffle master. */ > > > > void registerJob(JobShuffleContext context); > > > > > > > > /** Unregisters the target job from this shuffle master. */ > > > > void unregisterJob(JobID jobID); > > > > > > > > /** Asynchronously register a partition and its producer with the > > > > shuffle service. */ > > > > CompletableFuture<T> registerPartitionWithProducer( > > > > JobID jobID, > > > > PartitionDescriptor partitionDescriptor, > > > > ProducerDescriptor producerDescriptor); > > > > > > > > /** Releases any external resources occupied by the given > > partition. > > > */ > > > > void releasePartitionExternally(ShuffleDescriptor > > shuffleDescriptor); > > > > > > > > /** Releases the target cluster partitions stored externally if > > any. > > > */ > > > > void releaseDataSetExternally(IntermediateDataSetID dataSetID); > > > > } > > > > > > > > The following is the JobShuffleContext interface. It will be > > implemented > > > > by the pluggable shuffle framework itself and can be used by the > > shuffle > > > > plugin. > > > > > > > > public interface JobShuffleContext { > > > > > > > > /** Gets the corresponding job configuration. */ > > > > Configuration getConfiguration(); > > > > > > > > /** Gets the corresponding {@link JobID}. */ > > > > JobID getJobID(); > > > > > > > > /** > > > > * Stops tracking the target result partitions, which means these > > > > partitions will be reproduced if used afterwards. > > > > */ > > > > CompletableFuture<Void> > > > > stopTrackingPartitions(Collection<ResultPartitionID> > > > > partitionIDS); > > > > > > > > /** Returns information of all partitions being tracked for the > > > current > > > > job. */ > > > > CompletableFuture<List<ResultPartitionDeploymentDescriptor>> > > > > listPartitions(); > > > > } > > > > > > > > What do you think of these changes? Any feedback is highly > appreciated. > > > > > > > > > >