Glad to receive your positive feedbacks Till! 

Actually our motivation is to support batch job well as you mentioned.

For output level, flink already has the Subpartition abstraction(writer), and 
currently there are PipelinedSubpartition(memory output) and 
SpillableSubpartition(one-sp-one-file output) implementations. We can extend 
this abstraction to realize other persistent outputs (e.g. sort-merge-file).

For transport level(shuffle service), the current SubpartitionView 
abstraction(reader) seems as the brige linked with the output level, then the 
view can understand and read the different output formats. The current 
NetworkEnvironment seems take the role of internal shuffle service in 
TaskManager and the transport server is realized by netty inside. This 
component can also be started in other external containers like NodeManager of 
yarn to take the role of external shuffle service. Further we can abstract to 
extend the shuffle service for transporting outputs by http or rdma instead of 
current netty.  This abstraction should provide the way for output registration 
in order to read the results correctly, similar with current SubpartitionView.

The above is still a rough idea. Next I plan to create a feature jira to cover 
the related changes if possible. It would be better if getting help from 
related committers to review the detail designs together.

Best,
Zhijiang


------------------------------------------------------------------
发件人:Till Rohrmann <trohrm...@apache.org>
发送时间:2018年8月29日(星期三) 17:36
收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com>
主 题:Re: [DISCUSS] Proposal of external shuffle service

Thanks for starting this design discussion Zhijiang!

I really like the idea to introduce a ShuffleService abstraction which allows 
to have different implementations depending on the actual use case. Especially 
for batch jobs I can clearly see the benefits of persisting the results 
somewhere else.

Do you already know which interfaces we need to extend and where to introduce 
new abstractions?

Cheers,
Till
On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999) 
<wangzhijiang...@aliyun.com.invalid> wrote:
Hi all!

 The shuffle service is responsible for transporting upstream produced data to 
the downstream side. In flink, the NettyServer is used for network transport 
service and this component is started in the TaskManager process. That means 
the TaskManager can support internal shuffle service which exists some concerns:
 1. If a task finishes, the ResultPartition of this task still retains 
registered in TaskManager, because the output buffers have to be transported by 
internal shuffle service in TaskManager. That means the TaskManager can not be 
released by ResourceManager until ResultPartition released. It may waste 
container resources and can not support well for dynamic resource scenarios.
 2. If we want to expand another shuffle service implementation, the current 
mechanism is not easy to handle, because the output level (result partition) 
and transport level (shuffle service) are not divided clearly and loss of 
abstraction to be extended.

 For above considerations, we propose the external shuffle service which can be 
deployed on any other external contaienrs, e.g. NodeManager container in yarn. 
Then the TaskManager can be released ASAP ifneeded when all the internal tasks 
finished. The persistent output files of these finished tasks can be served to 
transport by external shuffle service in the same machine.

 Further we can abstract both of the output level and transport level to 
support different implementations. e.g. We realized merging the data of all the 
subpartitions into limited persistent local files for disk improvements in some 
scenarios instead of one-subpartiton-one-file.

 I know it may be a big work for doing this, and I just point out some ideas, 
and wish getting any feedbacks from you!

 Best,
 Zhijiang

Reply via email to