Very clear. Thanks!

> On Jan 28, 2019, at 10:29 PM, zhijiang <wangzhijiang...@aliyun.com> wrote:
> 
> Hi Qi,
> 
> Thanks for the concerns of this proposal. In Blink we implemented the 
> YarnShuffleService which is mainly used for batch jobs in production and some 
> benchmark before. This YarnShuffleService is not within the current proposed 
> ShuffleManager interface and there is also no ShuffleMaster component in JM 
> side. You can regard that as a simple and special implementation version. And 
> the YarnShuffleService can further be refactored within this proposed shuffle 
> manager architecture. 
> 
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> From:qi luo <luoqi...@gmail.com>
> Send Time:2019年1月28日(星期一) 20:55
> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> Cc:Till Rohrmann <trohrm...@apache.org>; Andrey Zagrebin 
> <and...@da-platform.com>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi Zhijiang,
> 
> I see there’s a YarnShuffleService in newly released Blink branch. Is there 
> any relationship between that YarnShuffleService and  your external shuffle 
> service?
> 
> Regards,
> Qi
> 
> > On Jan 28, 2019, at 8:07 PM, zhijiang <wangzhijiang...@aliyun.com.INVALID> 
> > wrote:
> > 
> > Hi till,
> > 
> > Very glad to receive your feedbacks and it is atually very helpful.
> > 
> > The proposed ShuffleMaster in JM would be involved in many existing 
> > processes, such as task deployment, task failover, TM release, so it might 
> > be interactive with corresponding Scheduler, FailoverStrategy, SlotPool 
> > components. In the first version we try to focus on deploying process which 
> > is described in detail in the FLIP. Concerning the other improvements based 
> > on the proposed architecuture, we just mentioned the basic ideas and have 
> > not given the whole detail process. But I think it is reasonable and 
> > natural to solve these issues based on that. And we would further give more 
> > details for other future steps.
> > 
> > I totally agree with your thought of handling TM release. Currently once 
> > the task is finished, the corresponding slot is regared as free no matter 
> > whether the produced partition is consumed or not. Actually we could think 
> > both task and its partitionsoccupy resources in slot. So the slot can be 
> > regared as free until the internal partition is consumed and released. Then 
> > the TM release logic is also improved meanwhile. I think your suggestions 
> > below already gives the detail and specific process for this improvement.
> > 
> > I am in favor of launching a separate thread for this discussion again, 
> > thanks for the advice!
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > From:Till Rohrmann <trohrm...@apache.org>
> > Send Time:2019年1月28日(星期一) 19:14
> > To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> > Cc:Andrey Zagrebin <and...@da-platform.com>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Thanks for creating the FLIP-31 for the external shuffle service Zhijiang. 
> > It looks good to me. 
> > 
> > One thing which is not fully clear to me yet is how the lifecycle 
> > management of the partitions integrates with the slot management. At the 
> > moment, conceptually we consider the partition data being owned by the TM 
> > if I understood it correctly. This means the ShuffleMaster is asked whether 
> > a TM can be freed. However, the JobMaster only thinks in terms of slots and 
> > not TMs. Thus, the logic would be that the JM asks the ShuffleMaster 
> > whether it can return a certain slot. Atm the freeing of slots is done by 
> > the `SlotPool` and, thus this would couple the `SlotPool` and the 
> > `ShuffleMaster`. Maybe we need to introduce some mechanism to signal when a 
> > slot has still some occupied resources. In the shared slot case, one could 
> > think of allocating a dummy slot in the shared slot which we only release 
> > after the partition data has been consumed.
> > 
> > In order to give this design document a little bit more visibility, I would 
> > suggest to post it again on the dev mailing list in a separate thread under 
> > the title "[DISCUSS] Flip-31: Pluggable Shuffle Manager" or something like 
> > this.
> > 
> > Cheers,
> > Till
> > On Mon, Jan 21, 2019 at 7:05 AM zhijiang 
> > <wangzhijiang...@aliyun.com.invalid> wrote:
> > Hi all,
> > 
> > FYI, I created the FLIP-31 under [1] for this proposal and created some 
> > subtasks under umbrella jira [2].
> > Welcome any concerns in previous google doc or speific jiras.
> > 
> > [1] 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager
> > [2] https://issues.apache.org/jira/browse/FLINK-10653
> > 
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > Send Time:2019年1月15日(星期二) 17:55
> > To:Andrey Zagrebin <and...@da-platform.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Hi all,
> > 
> > After continuous discussion with Andrey offline, we already reach an 
> > agreement for this proposal and co-author the latest google doc under [1].
> > 
> > We plan to create FLIP and sub-tasks by the end of this week, and the first 
> > MVP wishes to be covered in FLINK 1.8.
> > 
> > Welcome any feedbacks and suggestions! :)
> > 
> > [1] 
> > https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > Send Time:2018年12月25日(星期二) 15:33
> > To:Andrey Zagrebin <and...@da-platform.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Hi Andrey,
> > 
> > Thanks for efficient response for the UnknownShuffleDeploymentDescriptor 
> > issue.
> > 
> > It is reasonable for considering this special case on both ShuffleMaster 
> > and ShuffleService sides.
> > On upstream ShuffleService side, the created ResultPartitionWriter decides 
> > whether to notify ShuffleMaster of consumable partition when outputs the 
> > first buffer or finishes.
> > On ShuffleMaster side, it might define a method in ShuffleMaster interface 
> > for handling this notification message from upstream side, and then 
> > internally decide whether to update partition info for downstream sides or 
> > not.
> > On downstream ShuffleService side, it might define a method in 
> > ShuffleService interface to handle the update partition info message from 
> > ShuffleMaster, then it can find the corresponding created InputGate to 
> > update.
> > The communication between ShuffleService and ShuffleMaster can make use of 
> > TMGateway & JMGateway for current implementation. Certainly it can also 
> > rely on other ways for different ShuffleManager implementations. I would 
> > update the google doc to make this process clear if you also think so. :)
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > From:Andrey Zagrebin <and...@da-platform.com>
> > Send Time:2018年12月25日(星期二) 02:32
> > To:zhijiang <wangzhijiang...@aliyun.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Hi Zhijiang,
> > 
> > Thanks for considering my thoughts and concerns. Those are just suggestions 
> > for your design document.
> > 
> > My understanding about 2.1 was initially that shuffle service is also 
> > treated as unknown in case of UnknownShuffleDeploymentDescriptor which is 
> > not quite true.
> > Thinking about it more, it might be actually up to shuffle service to 
> > decide how to react on the events of producer or consumer deployment.
> > Maybe, ShuffleMaster could have two register/deregister methods for input 
> > and output (now partition) and/or also task state update method to 
> > communicate status of ShuffleService running in TM.
> > Internally shuffle implementation could decide how to communicate between 
> > ShuffleMaster and ShuffleService. If shuffle is channel-based it can behave 
> > in a similar way as now.
> > I agree it probably needs more discussion and refactoring could be planned 
> > step by step if it is too involving change.
> > 
> > Best,
> > Andrey
> > 
> > On Mon, Dec 24, 2018 at 11:31 AM zhijiang <wangzhijiang...@aliyun.com> 
> > wrote:
> > Hi Andrey,
> > 
> > Thanks for further research on this topic and providing very helpful 
> > summaries.  
> > 
> > As we discussed before, I really like the idea of dividing two separate 
> > components on both JM and TM sides.
> > 
> > 1. On JM side, the ShuffleMaster componenet created from ShuffleManager can 
> > manage and handle partition related issues properly.
> > 
> > 1.1 The introduction of PartitionShuffleDescriptor and 
> > PartitiondDeploymentDescriptor is suitable for covering all the necessary 
> > infos related with partition during deployment process and other future 
> > extensions. The form of this new descriptor is also consistent with 
> > existing ResultPartitionDeploymentDescriptor and 
> > InputGateDeploymentDescriptor.
> > 
> > 2. On TM side, the ShuffleService component created from ShuffleManager is 
> > a TM level service, which can be used for creating ResultPartitionWriter 
> > and InputGate during task deployment.
> > 
> > 
> > 2.1 Concerning of updating UnknownShuffleDeploymentDescriptor,  I think it 
> > may bring an argument that whether the ShuffleService should provide a 
> > separate method for updating it or not. In other words, because the 
> > InputGate is created by ShuffleService, then whether all the possible 
> > operations for InputGate such as update or release should be handled via 
> > ShuffleService? I think it can be interpreted to operate InputGate directly 
> > if the update or release is general for all the ShuffleService 
> > implementations. But the InputGate interface should provide the explicit 
> > methods for releasing itself and updating input channels to make the whole 
> > process work.
> > 
> > 2.2 In addition, some implementation details can be further confirmed in 
> > separate JIRAs,  such as whether we need task info related parameters 
> > during creating writer, and how to extract necessary components from 
> > current NetworkEnvrironment to wrap in specific ShuffleService 
> > implementation, etc.
> > 
> > 3. For the points mentioned in future extensions, I agree with your 
> > analysis. We can focus on them separately step by step in different 
> > priorities. The above ShuffleMaster provides a basic precondition for 
> > decoupling the life cycles between partition state and task state. Then we 
> > can further extend the methods in ShuffleMaster to know whether the 
> > partition is still available for speeding up failover, and whether the 
> > partition is consumed by downstream to decide when to release TM or clean 
> > partition, etc. It is also a good idea to further refactor the interfaces 
> > on writer and reader sides to fine-grained handle raw record instead of 
> > Buffer. And it would be involved in more changes in current 
> > RecordWriter/StreamInputProcessor.
> > 
> > I think we can further confirm the above 2.1 issue, then I would adjust the 
> > google doc based on our conclusions which cover not only the first step, 
> > but also all the future extensions described and listed in priority. 
> > BTW, do you think it is necessary that we further co-author a FLIP for this 
> > feature? It is actually involved in many changes on both TM, JM sides.  :)
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > 
> > ------------------------------------------------------------------
> > From:Andrey Zagrebin <and...@data-artisans.com>
> > Send Time:2018年12月20日(星期四) 01:20
> > To:zhijiang <wangzhijiang...@aliyun.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Hi Zhijiang,
> > 
> > Thanks for detailed answers! I am glad we are on the same page.
> > 
> > I spent some time on thinking more about our concerns and decided to make 
> > more suggestions for the discussion.
> > At the end, I also gathered some points related to possible extensions of 
> > shuffle API to verify that the final pluggable design can support them 
> > later with less changes.
> > 
> > 
> > It might make sense for shuffle implementation to have component running on 
> > both JM and TM sides.
> > JM has a global view of what is happening and can interact with shuffling 
> > system independently on whether tasks are running or not. The component 
> > services could internally further communicate to each other outside of 
> > existing JM/TM APIs, depending on shuffle implementation.
> > It could help later with partition global life cycle management and cleanup.
> > Moreover, if we decide to use some ShuffleDeploymentDescriptor instead of 
> > ResultPartitionLocation or factories to instantiate Readers and Writers, 
> > they can be created in Task Executor. 
> > JM is probably not interested in this concern. ShuffleDeploymentDescriptor 
> > can be specific to shuffle implementation, like factories, and contain 
> > specific shuffle config for task side.
> > 1. Configuration:
> > interface ShuffleManager {
> >   ShuffleMaster createMaster(Configuration flinkConfig);
> >   ShuffleService createService(Configuration flinkConfig);
> > }
> > ShuffleManager is a factory for ShuffleMaster (JM side) and ShuffleService 
> > (TM side).
> > Flink config could also contain specific shuffle configuration, like port 
> > etc.
> > Class which implements ShuffleManager in Flink cluster config, default is 
> > what we have now (can be the first step)
> > 2. Job master side
> > class PartitionShuffleDescriptor {
> >   JobID, ExecutionAttemptID, ResultPartitionType, ResultPartitionLocation, 
> > TaskManagerLocation, etc
> >   later possibly ShuffleType/Descriptor to choose from available shuffle 
> > implementations
> > }
> > PartitionShuffleDescriptor contains all abstract information which JM can 
> > provide from the job/execution graph.
> > ResultPartitionType and ResultPartitionLocation are derived from graph and 
> > execution mode, 
> > so I think they are rather general parameters for any shuffle service and 
> > do not belong to particular shuffle implementation.
> > interface ShuffleMaster extends AutoClosable {
> >   ShuffleDeploymentDescriptor registerPartition(PartitionShuffleDescriptor);
> >   void deregisterPartition(PartionShuffleDescriptor);
> > }
> > JM process creates ShuffleMaster from configured per cluster 
> > ShuffleManager. JM is responsible for its life cycle.
> > ShuffleMaster is a global manager for partitions.
> > JM creates PartitionShuffleDescriptor and uses ShuffleMaster to register 
> > partition, e.g. when producer is deployed. 
> > ShuffleMaster transforms abstract PartitionShuffleDescriptor into a 
> > specific ShuffleDeploymentDescriptor.
> > ShuffleDeploymentDescriptor is put into ResultPartitionDeploymentDescriptor 
> > and InputGateDeploymentDescriptor.
> > It can contain specific partition config for ShuffleService on TM side to 
> > serve record readers and writers. 
> > If it is channel-based then further break down to channel configs.
> > Special UnknownShuffleDeploymentDescriptor could be used for eager 
> > deployment when task input is unknown yet.
> > Later, we could add an option to release partition globally by 
> > deregistering it with the ShuffleMaster. e.g. to clean it up.
> > 3. Task executor side
> > interface ShuffleService extends AutoClosable {
> >   ResultPartitionWriter 
> > createResultPartitionWriter(ResultPartitionDeploymentDescriptor);
> >   InputGate createInputGate(InputGateDeploymentDescriptor);
> > }
> > TM process creates ShuffleService from configured per cluster 
> > ShuffleManager. TM is responsible for its life cycle.
> > ShuffleService could substitute NetworkEnvironment in TaskManagerServices.
> > 4. Later extensions
> > 4.1 Per job/job edge config
> > To keep jobs cluster independent, we could introduce abstract predefined 
> > ShuffleType’s or descriptors
> > for job developer to set it per job or job edge. The types are 
> > cluster-independent.
> > Cluster config could contain provided ShuffleManager implementation class 
> > for each supported ShuffleType or fallback to default for some types.
> > Instead of one ShuffleMaster/ShuffleService, JM/TM could have keep a 
> > registry of ShuffleMaster/ShuffleService’s per ShuffleType.
> > 4.2 Delay TM shutdown until all local partitions have been consumed
> > JM could keep separately state of partition life cycle (e.g. in job state, 
> > HA). The task executor is to shutdown (e.g. after timeout in yarn) if all 
> > its tasks are done and all local partitions are consumed. If there are no 
> > local partitions then it can shutdown immediately. Whether JM should check 
> > that all produced by TM partitions are consumed is a feature of 
> > ShuffleManager. This could be done by calling some 
> > ShuffleManager.getFeatures() interface method.
> > 4.3 Speed up failover
> > If partition is computed JM could reuse it as mention in fine-grained 
> > shuffle system design. Whether the partition is still available after task 
> > or task executor crash is also a feature of ShuffleManager.getFeatures().
> > 4.4 Partition garbage collection
> > When the consumer task is done, the partition is to deregister and cleanup 
> > with the ShuffleMaster. 
> > In case of external storage, partitions are at risk to linger after 
> > job/cluster failures. The partition TTL is one option as mentioned in 
> > fine-grained shuffle system design. The TTL timer could be started when 
> > there is no partition access activity for certain period of time but there 
> > is always risk to lose partition too early. User could try to recover 
> > failed job any time later. So it might need more sophisticated approach, 
> > like manual cleanup triggering (ShuffleMaster.cleanup(PartitionsInUse)) 
> > which drops all currently unused partitions.
> > 4.5 Shuffle Reader/Writers operation per record/byte[]/buffer
> > As discussed, ResultPartitionWriter/InputGate operates on buffers with 
> > serialised records data. Certain shuffle services might benefit from 
> > operating per serialised records or even java objects (e.g. local channel 
> > could hand over them or their copies from TypeSerializer.copy()). Record 
> > key could be treated as its meta info, additionally to bytes or to user 
> > java object.
> > ShuffleService could be refactored later to return 
> > RecordReader/RecordWriter. They could extend 
> > AbstractSerialisingRecordReader/Writer or 
> > AbstractBufferingRecordReader/Writer to import current behaviour and share 
> > code. This requires refactoring of StreamInputProcessor and RecordWriter to 
> > extract the interfaces.
> > It might be useful for ResultPartitionWriter/InputGate or 
> > RecordReader/RecordWriter also to extend AutoClosable in case the internal 
> > implementation needs a per task life cycle for them.
> > 
> > I hope it can help with the design. Feel free to give feedback.
> > 
> > Best,
> > Andrey
> > 
> > On 10 Dec 2018, at 08:41, zhijiang <wangzhijiang...@aliyun.com> wrote:
> > Hi Andrey,
> > 
> > Thanks for providing so detailed concerns and enlightenments for this 
> > proposal. We exchanged our views of three main issues on google doc last 
> > week and it seems more appropriate to further contact here. :)
> > 
> > 1. Configuration level for shuffle (cluster/job/operator)
> > - how do we share shuffle manager resources among different job tasks 
> > within one task executor process? It could be some static objects shared by 
> > all shuffle manager objects of some type but it might be not scalable 
> > approach. Example could be multiplexed netty connections (as I understand, 
> > current netty stack can become just custom shuffle service).
> > The creation of ShuffleManager instance on task level is just like the 
> > process of creating StateBackend in StateBackendLoader. The ShuffleService 
> > and ShuffleManager are two independent components, and the interaction 
> > between them is only registration mechanism. In detail, if some 
> > ShuffleManager instances want to rely ShuffleService to transport data, 
> > then it can register related infos to ShuffleService during creation of 
> > ResultPartitionWriter. So the ShuffleManager instance do not need  contain 
> > any objects like netty related stacks. The flink runtime can provide one 
> > unified netty-based ShuffleService which can be started in both internal 
> > TaskManager or external containers. The internal ShuffleService not only 
> > takes the role of tranporting data directly for some ShuffleManager 
> > instances but also takes the role of RPC server for communicating with 
> > external ShuffleService, such as register result partition to external 
> > service, otherwise the external service might need an additional RPC 
> > service to contact with TaskManager.  Here we have the implicit meaning to 
> > make intenral shuffle as a basic service started in TaskManager like the 
> > components of IOManager and MemoryManager, even thought useless for some 
> > type jobs.
> > - In case of having it per job, we might need to provide compatibility 
> > check between shuffle service and cluster mode (e.g. yarn ext shuffle 
> > service for standalone mode cluster) if it is an issue.
> > - Having it per job feels like the same complexity as having it per 
> > operator, at the first glance, just changes its granularity and where 
> > objects reside.
> > - what is the problem to use cluster per job mode? Then shuffle manager per 
> > cluster and per job is the same but might simplify other issues at the 
> > beginning. Streaming and batch jobs with different shuffle requirements 
> > could be started in different clusters per job.
> > 
> > I totally agree with the above concerns for per job configuration. As you 
> > mentioned, it is a option to run different type jobs in different clusters. 
> > But in some special scenarios like hybrid cluster to run online and offline 
> > jobs in differemt times, it is betterto support job level configuration for 
> > fexibility. Certainly it may not be a strong requirements for most cases, 
> > then we can reach an agreement to make the cluster level as the easiest way 
> > first and adjut the level if needed in future.
> > 
> > 2. ShuffleManager interface
> > 
> > I think you mentioned three sub issues in this part:
> > 
> > 2.1 Introduction of additional ResultPartitionWriterFactory && 
> > InputGateReaderFactory
> > 
> > I am not against the introduction of these two factories. The original 
> > introduction of pluggable ShuffleManager interface is for creating 
> > different writer and reader sides. If the ShuffleManager interface is used 
> > for creating factories, and then the factories are used for creating writer 
> > and reader. I still think the essence is same, and only the form is 
> > different.  That is the ShuffleManager concept is seen on JobManager side, 
> > and the task only sees the corresponding factories from ShuffleManager. In 
> > other words, we add another factory layer to distinguish between JobManager 
> > and task. The form might seem a bit better to introduce corresponding 
> > factories, so I am willing to take this way for implementation.
> > 
> > 2.2 Whether to retain getResultPartitionLocation method in ShuffleManager 
> > interface
> > 
> > If I understand correctly, you mean to put this location as an argument in 
> > InputGateReaderFacotry constructor? If to do so, I think it makes sense and 
> > we can avoid have this explicit method in interface. But we also need to 
> > adjust the existing related process like updatePartitionInfo for downstream 
> > side. In this case, the partition location is unknown during deploying 
> > downstream tasks. Based on upstream's consumable notification, the location 
> > update is triggered by JobManager to downstream side.
> > 
> > 2.3 ShuffleService interface
> > 
> > My initial thought is not making it as an interface. Because for internal 
> > or external shuffle cases, they can reuse the same unified netty-based 
> > shuffle service if we wrap the related componenets into current shuffle 
> > service well. If we want to furtherextend other implementations of shuffle 
> > service, like http-based shuffle service, then we can define an interface 
> > for it, the way as current RpcService interface to get ride of only akka 
> > implementations. So it also makes sense on my side to keep this interface. 
> > As for ShuffleServiceRegistry class, I agree with you to have this 
> > TaskManager level service for managing and sharing for all the internal 
> > tasks.
> > 
> > In summary, I think we do not have essential conflicts for above issues, 
> > almost for the implementation aspects. And I agree with the above points, 
> > especially for above 2.2 you might need double check if I understand 
> > correctly. 
> > Wish your further feedbacks then I can adjust the docs based on it.  Also 
> > welcome any other person's feedbacks!
> > 
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > 发件人:Andrey Zagrebin <and...@data-artisans.com>
> > 发送时间:2018年12月10日(星期一) 05:18
> > 收件人:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
> > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>; Till Rohrmann 
> > <trohrm...@apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Hi Zhijiang,
> > 
> > 
> > Thanks for sharing the document Zhijiang. 
> > I decided to compile my thoughts to consider here, not to overload document 
> > comments any more :)
> > I think I still have question about job level configuration for the shuffle 
> > service. You mentioned that we can keep several shuffle manager objects in 
> > one task executor for different jobs. This is valid. My concerns are:
> > - how do we share shuffle manager resources among different job tasks 
> > within one task executor process? It could be some static objects shared by 
> > all shuffle manager objects of some type but it might be not scalable 
> > approach. Example could be multiplexed netty connections (as I understand, 
> > current netty stack can become just custom shuffle service).
> > - In case of having it per job, we might need to provide compatibility 
> > check between shuffle service and cluster mode (e.g. yarn ext shuffle 
> > service for standalone mode cluster) if it is an issue.
> > - Having it per job feels like the same complexity as having it per 
> > operator, at the first glance, just changes its granularity and where 
> > objects reside.
> > - what is the problem to use cluster per job mode? Then shuffle manager per 
> > cluster and per job is the same but might simplify other issues at the 
> > beginning. Streaming and batch jobs with different shuffle requirements 
> > could be started in different clusters per job. 
> > As for ShuffleManager interface, I think I see your point with the 
> > ResultPartitionLocation. I agree that partition needs some addressing of 
> > underlying connection or resources in general. It can be thinked of as an 
> > argument of ShuffleManager factory methods.
> > My point is that task code might not need to be coupled to shuffle 
> > interface. This way we could keep task code more independent of records 
> > transfer layer. We can always change later how shuffle/network service is 
> > organised internally without any consequences for the general task code. If 
> > task code calls just factories provided by JM, it might not even matter for 
> > the task in future whether it is configured per cluster, job or operator. 
> > Internally, factory can hold location of concrete type if needed.
> > Code example could be:
> > Job Manager side:
> > interface ShuffleManager {
> >   ResultPartionWriterFactory 
> > createResultPartionWriterFactory(job/task/topology descriptors);
> >   // similar for input gate factory
> > }
> > class ShuffleManagerImpl implements ShuffleManager {
> >   private general config, services etc;
> >   ResultPartionWriterFactory 
> > createResultPartionWriterFactory(job/task/topology descriptors) {
> >     return new ResultPartionWriterFactoryImpl(location, job, oper id, other 
> > specific config etc);
> >   }
> >   // similar for input gate factory
> > }
> > ...
> > // somewhere in higher level code put ResultPartionWriterFactory into 
> > descriptor
> > Task executor side receives the factory inside the descriptor and calls 
> > factory.create(ShuffleServiceRegistry). Example of factory:
> > class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory {
> >   // all fields are lightweight and serialisable, received from JM
> >   private location, shuffle service id, other specific config etc;
> > 
> >  ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more 
> > generic args) {
> >     // get or create task local specific ShuffleServiceImpl by id in 
> > registry
> >     // ShuffleServiceImpl object can be shared between jobs
> >     // register with the ShuffleServiceImpl by location, id, config etc
> >   }
> > }
> > interface ShuffleService extends AutoClosable {
> >   getId();
> > }
> > ShuffleServiceImpl manages resources and decides internally whether to do 
> > it per task executor, task, job or operator. It can contain network stack, 
> > e,g, netty connections etc. In case of external service, it can hold 
> > partition manager, transport client etc. It is not enforced to have it per 
> > job by this contract or even to have it at all. ShuffleServiceImpl also 
> > does not need to depend on all TaskManagerServices, only create relevant 
> > inside, e.g. network.
> > class ShuffleServiceRegistry {
> >   <T extends ShuffleService> T getShuffleService(id);
> >  registerShuffleService(ShuffleService, id);
> >   deregisterShuffleService(id); // remove and close ShuffleService
> >   close(); // close all
> > }
> > ShuffleServiceRegistry is just a generic container of all available 
> > ShuffleService’s. It could be part of TaskManagerServices instead of 
> > NetworkEnvironment which could go into specific ShuffleServiceImpl.
> > 
> > I might still miss some details, I would appreciate any feedback.
> > 
> > Best,
> > Andrey
> > 
> > On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang...@aliyun.com.INVALID> 
> > wrote:
> > Hi all,
> > 
> > I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow 
> > down the scope of introducing pluggable shuffle manager architecture as the 
> > first step. 
> > Welcome further feedbacks and suggestions, then I would create specific 
> > subtasks for it to forward.
> > 
> > [1] https://issues.apache.org/jira/browse/FLINK-10653
> > 
> > [2] 
> > https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
> > ------------------------------------------------------------------
> > 发件人:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > 发送时间:2018年11月1日(星期四) 17:19
> > 收件人:dev <dev@flink.apache.org>; Jin Sun <isun...@gmail.com>
> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
> > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>
> > 主 题:回复:[DISCUSS] Proposal of external shuffle service
> > 
> > Thanks for the efficient response till!
> > 
> > Thanks sunjin for the good feedbacks, we will further confirm with the 
> > comments then! :)
> > ------------------------------------------------------------------
> > 发件人:Jin Sun <isun...@gmail.com>
> > 发送时间:2018年11月1日(星期四) 06:42
> > 收件人:dev <dev@flink.apache.org>
> > 抄 送:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Nico Kruber 
> > <n...@data-artisans.com>; Piotr Nowojski <pi...@data-artisans.com>; Stephan 
> > Ewen <se...@apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Thanks Zhijiang for the proposal. I like the idea of external shuffle 
> > service, have left some comments on the document. 
> > 
> > On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> > 
> > Thanks for the update Zhijiang! The community is currently quite busy with
> > the next Flink release. I hope that we can finish the release in two weeks.
> > After that people will become more responsive again.
> > 
> > Cheers,
> > Till
> > 
> > On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang...@aliyun.com> wrote:
> > 
> > I already created the umbrella jira [1] for this improvement, and attched
> > the design doc [2] in this jira.
> > 
> > Welcome for further discussion about the details.
> > 
> > [1] https://issues.apache.org/jira/browse/FLINK-10653
> > [2]
> > https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing
> > 
> > 
> > <https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing>
> > Best,
> > Zhijiang
> > 
> > ------------------------------------------------------------------
> > 发件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com.INVALID>
> > 发送时间:2018年9月11日(星期二) 15:21
> > 收件人:dev <dev@flink.apache.org>
> > 抄 送:dev <dev@flink.apache.org>
> > 主 题:回复:[DISCUSS] Proposal of external shuffle service
> > 
> > Many thanks Till!
> > 
> > 
> > I would create a JIRA for this feature and design a document attched with 
> > it.
> > I will let you know after ready! :)
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > 发件人:Till Rohrmann <trohrm...@apache.org>
> > 发送时间:2018年9月7日(星期五) 22:01
> > 收件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>
> > 抄 送:dev <dev@flink.apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > The rough plan sounds good Zhijiang. I think we should continue with what
> > you've proposed: Open a JIRA issue and creating a design document which
> > outlines the required changes a little bit more in detail. Once this is
> > done, we should link the design document in the JIRA issue and post it here
> > for further discussion.
> > 
> > Cheers,
> > Till
> > 
> > On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com> wrote:
> > 
> > Glad to receive your positive feedbacks Till!
> > 
> > Actually our motivation is to support batch job well as you mentioned.
> > 
> > For output level, flink already has the Subpartition abstraction(writer),
> > and currently there are PipelinedSubpartition(memory output) and
> > SpillableSubpartition(one-sp-one-file output) implementations. We can
> > extend this abstraction to realize other persistent outputs (e.g.
> > sort-merge-file).
> > 
> > For transport level(shuffle service), the current SubpartitionView
> > abstraction(reader) seems as the brige linked with the output level, then
> > 
> > the view can understand and read the different output formats. The current
> > NetworkEnvironment seems take the role of internal shuffle service in
> > TaskManager and the transport server is realized by netty inside. This
> > 
> > component can also be started in other external containers like NodeManager
> > of yarn to take the role of external shuffle service. Further we can
> > 
> > abstract to extend the shuffle service for transporting outputs by http or
> > 
> > rdma instead of current netty.  This abstraction should provide the way for
> > output registration in order to read the results correctly, similar with
> > current SubpartitionView.
> > 
> > The above is still a rough idea. Next I plan to create a feature jira to
> > cover the related changes if possible. It would be better if getting help
> > from related committers to review the detail designs together.
> > 
> > Best,
> > Zhijiang
> > 
> > ------------------------------------------------------------------
> > 发件人:Till Rohrmann <trohrm...@apache.org>
> > 发送时间:2018年8月29日(星期三) 17:36
> > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> > 
> > Thanks for starting this design discussion Zhijiang!
> > 
> > I really like the idea to introduce a ShuffleService abstraction which
> > 
> > allows to have different implementations depending on the actual use case.
> > 
> > Especially for batch jobs I can clearly see the benefits of persisting the
> > results somewhere else.
> > 
> > Do you already know which interfaces we need to extend and where to
> > introduce new abstractions?
> > 
> > Cheers,
> > Till
> > 
> > On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
> > <wangzhijiang...@aliyun.com.invalid> wrote:
> > Hi all!
> > 
> > 
> > The shuffle service is responsible for transporting upstream produced data
> > to the downstream side. In flink, the NettyServer is used for network
> > 
> > transport service and this component is started in the TaskManager process.
> > That means the TaskManager can support internal shuffle service which
> > exists some concerns:
> > 1. If a task finishes, the ResultPartition of this task still retains
> > registered in TaskManager, because the output buffers have to be
> > transported by internal shuffle service in TaskManager. That means the
> > TaskManager can not be released by ResourceManager until ResultPartition
> > released. It may waste container resources and can not support well for
> > dynamic resource scenarios.
> > 2. If we want to expand another shuffle service implementation, the
> > current mechanism is not easy to handle, because the output level (result
> > partition) and transport level (shuffle service) are not divided clearly
> > and loss of abstraction to be extended.
> > 
> > For above considerations, we propose the external shuffle service which
> > can be deployed on any other external contaienrs, e.g. NodeManager
> > 
> > container in yarn. Then the TaskManager can be released ASAP ifneeded when
> > all the internal tasks finished. The persistent output files of these
> > finished tasks can be served to transport by external shuffle service in
> > the same machine.
> > 
> > Further we can abstract both of the output level and transport level to
> > 
> > support different implementations. e.g. We realized merging the data of all
> > 
> > the subpartitions into limited persistent local files for disk improvements
> > in some scenarios instead of one-subpartiton-one-file.
> > 
> > I know it may be a big work for doing this, and I just point out some
> > ideas, and wish getting any feedbacks from you!
> > 
> > Best,
> > Zhijiang
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> 

Reply via email to