Very clear. Thanks!
> On Jan 28, 2019, at 10:29 PM, zhijiang <wangzhijiang...@aliyun.com> wrote:
>
> Hi Qi,
>
> Thanks for the concerns of this proposal. In Blink we implemented the
> YarnShuffleService which is mainly used for batch jobs in production and some
> benchmark before. This YarnShuffleService is not within the current proposed
> ShuffleManager interface and there is also no ShuffleMaster component in JM
> side. You can regard that as a simple and special implementation version. And
> the YarnShuffleService can further be refactored within this proposed shuffle
> manager architecture.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:qi luo <luoqi...@gmail.com>
> Send Time:2019年1月28日(星期一) 20:55
> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> Cc:Till Rohrmann <trohrm...@apache.org>; Andrey Zagrebin
> <and...@da-platform.com>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
>
> Hi Zhijiang,
>
> I see there’s a YarnShuffleService in newly released Blink branch. Is there
> any relationship between that YarnShuffleService and your external shuffle
> service?
>
> Regards,
> Qi
>
> > On Jan 28, 2019, at 8:07 PM, zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > wrote:
> >
> > Hi till,
> >
> > Very glad to receive your feedbacks and it is atually very helpful.
> >
> > The proposed ShuffleMaster in JM would be involved in many existing
> > processes, such as task deployment, task failover, TM release, so it might
> > be interactive with corresponding Scheduler, FailoverStrategy, SlotPool
> > components. In the first version we try to focus on deploying process which
> > is described in detail in the FLIP. Concerning the other improvements based
> > on the proposed architecuture, we just mentioned the basic ideas and have
> > not given the whole detail process. But I think it is reasonable and
> > natural to solve these issues based on that. And we would further give more
> > details for other future steps.
> >
> > I totally agree with your thought of handling TM release. Currently once
> > the task is finished, the corresponding slot is regared as free no matter
> > whether the produced partition is consumed or not. Actually we could think
> > both task and its partitionsoccupy resources in slot. So the slot can be
> > regared as free until the internal partition is consumed and released. Then
> > the TM release logic is also improved meanwhile. I think your suggestions
> > below already gives the detail and specific process for this improvement.
> >
> > I am in favor of launching a separate thread for this discussion again,
> > thanks for the advice!
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > From:Till Rohrmann <trohrm...@apache.org>
> > Send Time:2019年1月28日(星期一) 19:14
> > To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> > Cc:Andrey Zagrebin <and...@da-platform.com>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Thanks for creating the FLIP-31 for the external shuffle service Zhijiang.
> > It looks good to me.
> >
> > One thing which is not fully clear to me yet is how the lifecycle
> > management of the partitions integrates with the slot management. At the
> > moment, conceptually we consider the partition data being owned by the TM
> > if I understood it correctly. This means the ShuffleMaster is asked whether
> > a TM can be freed. However, the JobMaster only thinks in terms of slots and
> > not TMs. Thus, the logic would be that the JM asks the ShuffleMaster
> > whether it can return a certain slot. Atm the freeing of slots is done by
> > the `SlotPool` and, thus this would couple the `SlotPool` and the
> > `ShuffleMaster`. Maybe we need to introduce some mechanism to signal when a
> > slot has still some occupied resources. In the shared slot case, one could
> > think of allocating a dummy slot in the shared slot which we only release
> > after the partition data has been consumed.
> >
> > In order to give this design document a little bit more visibility, I would
> > suggest to post it again on the dev mailing list in a separate thread under
> > the title "[DISCUSS] Flip-31: Pluggable Shuffle Manager" or something like
> > this.
> >
> > Cheers,
> > Till
> > On Mon, Jan 21, 2019 at 7:05 AM zhijiang
> > <wangzhijiang...@aliyun.com.invalid> wrote:
> > Hi all,
> >
> > FYI, I created the FLIP-31 under [1] for this proposal and created some
> > subtasks under umbrella jira [2].
> > Welcome any concerns in previous google doc or speific jiras.
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager
> > [2] https://issues.apache.org/jira/browse/FLINK-10653
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > Send Time:2019年1月15日(星期二) 17:55
> > To:Andrey Zagrebin <and...@da-platform.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Hi all,
> >
> > After continuous discussion with Andrey offline, we already reach an
> > agreement for this proposal and co-author the latest google doc under [1].
> >
> > We plan to create FLIP and sub-tasks by the end of this week, and the first
> > MVP wishes to be covered in FLINK 1.8.
> >
> > Welcome any feedbacks and suggestions! :)
> >
> > [1]
> > https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > Send Time:2018年12月25日(星期二) 15:33
> > To:Andrey Zagrebin <and...@da-platform.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Hi Andrey,
> >
> > Thanks for efficient response for the UnknownShuffleDeploymentDescriptor
> > issue.
> >
> > It is reasonable for considering this special case on both ShuffleMaster
> > and ShuffleService sides.
> > On upstream ShuffleService side, the created ResultPartitionWriter decides
> > whether to notify ShuffleMaster of consumable partition when outputs the
> > first buffer or finishes.
> > On ShuffleMaster side, it might define a method in ShuffleMaster interface
> > for handling this notification message from upstream side, and then
> > internally decide whether to update partition info for downstream sides or
> > not.
> > On downstream ShuffleService side, it might define a method in
> > ShuffleService interface to handle the update partition info message from
> > ShuffleMaster, then it can find the corresponding created InputGate to
> > update.
> > The communication between ShuffleService and ShuffleMaster can make use of
> > TMGateway & JMGateway for current implementation. Certainly it can also
> > rely on other ways for different ShuffleManager implementations. I would
> > update the google doc to make this process clear if you also think so. :)
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > From:Andrey Zagrebin <and...@da-platform.com>
> > Send Time:2018年12月25日(星期二) 02:32
> > To:zhijiang <wangzhijiang...@aliyun.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Hi Zhijiang,
> >
> > Thanks for considering my thoughts and concerns. Those are just suggestions
> > for your design document.
> >
> > My understanding about 2.1 was initially that shuffle service is also
> > treated as unknown in case of UnknownShuffleDeploymentDescriptor which is
> > not quite true.
> > Thinking about it more, it might be actually up to shuffle service to
> > decide how to react on the events of producer or consumer deployment.
> > Maybe, ShuffleMaster could have two register/deregister methods for input
> > and output (now partition) and/or also task state update method to
> > communicate status of ShuffleService running in TM.
> > Internally shuffle implementation could decide how to communicate between
> > ShuffleMaster and ShuffleService. If shuffle is channel-based it can behave
> > in a similar way as now.
> > I agree it probably needs more discussion and refactoring could be planned
> > step by step if it is too involving change.
> >
> > Best,
> > Andrey
> >
> > On Mon, Dec 24, 2018 at 11:31 AM zhijiang <wangzhijiang...@aliyun.com>
> > wrote:
> > Hi Andrey,
> >
> > Thanks for further research on this topic and providing very helpful
> > summaries.
> >
> > As we discussed before, I really like the idea of dividing two separate
> > components on both JM and TM sides.
> >
> > 1. On JM side, the ShuffleMaster componenet created from ShuffleManager can
> > manage and handle partition related issues properly.
> >
> > 1.1 The introduction of PartitionShuffleDescriptor and
> > PartitiondDeploymentDescriptor is suitable for covering all the necessary
> > infos related with partition during deployment process and other future
> > extensions. The form of this new descriptor is also consistent with
> > existing ResultPartitionDeploymentDescriptor and
> > InputGateDeploymentDescriptor.
> >
> > 2. On TM side, the ShuffleService component created from ShuffleManager is
> > a TM level service, which can be used for creating ResultPartitionWriter
> > and InputGate during task deployment.
> >
> >
> > 2.1 Concerning of updating UnknownShuffleDeploymentDescriptor, I think it
> > may bring an argument that whether the ShuffleService should provide a
> > separate method for updating it or not. In other words, because the
> > InputGate is created by ShuffleService, then whether all the possible
> > operations for InputGate such as update or release should be handled via
> > ShuffleService? I think it can be interpreted to operate InputGate directly
> > if the update or release is general for all the ShuffleService
> > implementations. But the InputGate interface should provide the explicit
> > methods for releasing itself and updating input channels to make the whole
> > process work.
> >
> > 2.2 In addition, some implementation details can be further confirmed in
> > separate JIRAs, such as whether we need task info related parameters
> > during creating writer, and how to extract necessary components from
> > current NetworkEnvrironment to wrap in specific ShuffleService
> > implementation, etc.
> >
> > 3. For the points mentioned in future extensions, I agree with your
> > analysis. We can focus on them separately step by step in different
> > priorities. The above ShuffleMaster provides a basic precondition for
> > decoupling the life cycles between partition state and task state. Then we
> > can further extend the methods in ShuffleMaster to know whether the
> > partition is still available for speeding up failover, and whether the
> > partition is consumed by downstream to decide when to release TM or clean
> > partition, etc. It is also a good idea to further refactor the interfaces
> > on writer and reader sides to fine-grained handle raw record instead of
> > Buffer. And it would be involved in more changes in current
> > RecordWriter/StreamInputProcessor.
> >
> > I think we can further confirm the above 2.1 issue, then I would adjust the
> > google doc based on our conclusions which cover not only the first step,
> > but also all the future extensions described and listed in priority.
> > BTW, do you think it is necessary that we further co-author a FLIP for this
> > feature? It is actually involved in many changes on both TM, JM sides. :)
> >
> > Best,
> > Zhijiang
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Andrey Zagrebin <and...@data-artisans.com>
> > Send Time:2018年12月20日(星期四) 01:20
> > To:zhijiang <wangzhijiang...@aliyun.com>
> > Cc:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Hi Zhijiang,
> >
> > Thanks for detailed answers! I am glad we are on the same page.
> >
> > I spent some time on thinking more about our concerns and decided to make
> > more suggestions for the discussion.
> > At the end, I also gathered some points related to possible extensions of
> > shuffle API to verify that the final pluggable design can support them
> > later with less changes.
> >
> >
> > It might make sense for shuffle implementation to have component running on
> > both JM and TM sides.
> > JM has a global view of what is happening and can interact with shuffling
> > system independently on whether tasks are running or not. The component
> > services could internally further communicate to each other outside of
> > existing JM/TM APIs, depending on shuffle implementation.
> > It could help later with partition global life cycle management and cleanup.
> > Moreover, if we decide to use some ShuffleDeploymentDescriptor instead of
> > ResultPartitionLocation or factories to instantiate Readers and Writers,
> > they can be created in Task Executor.
> > JM is probably not interested in this concern. ShuffleDeploymentDescriptor
> > can be specific to shuffle implementation, like factories, and contain
> > specific shuffle config for task side.
> > 1. Configuration:
> > interface ShuffleManager {
> > ShuffleMaster createMaster(Configuration flinkConfig);
> > ShuffleService createService(Configuration flinkConfig);
> > }
> > ShuffleManager is a factory for ShuffleMaster (JM side) and ShuffleService
> > (TM side).
> > Flink config could also contain specific shuffle configuration, like port
> > etc.
> > Class which implements ShuffleManager in Flink cluster config, default is
> > what we have now (can be the first step)
> > 2. Job master side
> > class PartitionShuffleDescriptor {
> > JobID, ExecutionAttemptID, ResultPartitionType, ResultPartitionLocation,
> > TaskManagerLocation, etc
> > later possibly ShuffleType/Descriptor to choose from available shuffle
> > implementations
> > }
> > PartitionShuffleDescriptor contains all abstract information which JM can
> > provide from the job/execution graph.
> > ResultPartitionType and ResultPartitionLocation are derived from graph and
> > execution mode,
> > so I think they are rather general parameters for any shuffle service and
> > do not belong to particular shuffle implementation.
> > interface ShuffleMaster extends AutoClosable {
> > ShuffleDeploymentDescriptor registerPartition(PartitionShuffleDescriptor);
> > void deregisterPartition(PartionShuffleDescriptor);
> > }
> > JM process creates ShuffleMaster from configured per cluster
> > ShuffleManager. JM is responsible for its life cycle.
> > ShuffleMaster is a global manager for partitions.
> > JM creates PartitionShuffleDescriptor and uses ShuffleMaster to register
> > partition, e.g. when producer is deployed.
> > ShuffleMaster transforms abstract PartitionShuffleDescriptor into a
> > specific ShuffleDeploymentDescriptor.
> > ShuffleDeploymentDescriptor is put into ResultPartitionDeploymentDescriptor
> > and InputGateDeploymentDescriptor.
> > It can contain specific partition config for ShuffleService on TM side to
> > serve record readers and writers.
> > If it is channel-based then further break down to channel configs.
> > Special UnknownShuffleDeploymentDescriptor could be used for eager
> > deployment when task input is unknown yet.
> > Later, we could add an option to release partition globally by
> > deregistering it with the ShuffleMaster. e.g. to clean it up.
> > 3. Task executor side
> > interface ShuffleService extends AutoClosable {
> > ResultPartitionWriter
> > createResultPartitionWriter(ResultPartitionDeploymentDescriptor);
> > InputGate createInputGate(InputGateDeploymentDescriptor);
> > }
> > TM process creates ShuffleService from configured per cluster
> > ShuffleManager. TM is responsible for its life cycle.
> > ShuffleService could substitute NetworkEnvironment in TaskManagerServices.
> > 4. Later extensions
> > 4.1 Per job/job edge config
> > To keep jobs cluster independent, we could introduce abstract predefined
> > ShuffleType’s or descriptors
> > for job developer to set it per job or job edge. The types are
> > cluster-independent.
> > Cluster config could contain provided ShuffleManager implementation class
> > for each supported ShuffleType or fallback to default for some types.
> > Instead of one ShuffleMaster/ShuffleService, JM/TM could have keep a
> > registry of ShuffleMaster/ShuffleService’s per ShuffleType.
> > 4.2 Delay TM shutdown until all local partitions have been consumed
> > JM could keep separately state of partition life cycle (e.g. in job state,
> > HA). The task executor is to shutdown (e.g. after timeout in yarn) if all
> > its tasks are done and all local partitions are consumed. If there are no
> > local partitions then it can shutdown immediately. Whether JM should check
> > that all produced by TM partitions are consumed is a feature of
> > ShuffleManager. This could be done by calling some
> > ShuffleManager.getFeatures() interface method.
> > 4.3 Speed up failover
> > If partition is computed JM could reuse it as mention in fine-grained
> > shuffle system design. Whether the partition is still available after task
> > or task executor crash is also a feature of ShuffleManager.getFeatures().
> > 4.4 Partition garbage collection
> > When the consumer task is done, the partition is to deregister and cleanup
> > with the ShuffleMaster.
> > In case of external storage, partitions are at risk to linger after
> > job/cluster failures. The partition TTL is one option as mentioned in
> > fine-grained shuffle system design. The TTL timer could be started when
> > there is no partition access activity for certain period of time but there
> > is always risk to lose partition too early. User could try to recover
> > failed job any time later. So it might need more sophisticated approach,
> > like manual cleanup triggering (ShuffleMaster.cleanup(PartitionsInUse))
> > which drops all currently unused partitions.
> > 4.5 Shuffle Reader/Writers operation per record/byte[]/buffer
> > As discussed, ResultPartitionWriter/InputGate operates on buffers with
> > serialised records data. Certain shuffle services might benefit from
> > operating per serialised records or even java objects (e.g. local channel
> > could hand over them or their copies from TypeSerializer.copy()). Record
> > key could be treated as its meta info, additionally to bytes or to user
> > java object.
> > ShuffleService could be refactored later to return
> > RecordReader/RecordWriter. They could extend
> > AbstractSerialisingRecordReader/Writer or
> > AbstractBufferingRecordReader/Writer to import current behaviour and share
> > code. This requires refactoring of StreamInputProcessor and RecordWriter to
> > extract the interfaces.
> > It might be useful for ResultPartitionWriter/InputGate or
> > RecordReader/RecordWriter also to extend AutoClosable in case the internal
> > implementation needs a per task life cycle for them.
> >
> > I hope it can help with the design. Feel free to give feedback.
> >
> > Best,
> > Andrey
> >
> > On 10 Dec 2018, at 08:41, zhijiang <wangzhijiang...@aliyun.com> wrote:
> > Hi Andrey,
> >
> > Thanks for providing so detailed concerns and enlightenments for this
> > proposal. We exchanged our views of three main issues on google doc last
> > week and it seems more appropriate to further contact here. :)
> >
> > 1. Configuration level for shuffle (cluster/job/operator)
> > - how do we share shuffle manager resources among different job tasks
> > within one task executor process? It could be some static objects shared by
> > all shuffle manager objects of some type but it might be not scalable
> > approach. Example could be multiplexed netty connections (as I understand,
> > current netty stack can become just custom shuffle service).
> > The creation of ShuffleManager instance on task level is just like the
> > process of creating StateBackend in StateBackendLoader. The ShuffleService
> > and ShuffleManager are two independent components, and the interaction
> > between them is only registration mechanism. In detail, if some
> > ShuffleManager instances want to rely ShuffleService to transport data,
> > then it can register related infos to ShuffleService during creation of
> > ResultPartitionWriter. So the ShuffleManager instance do not need contain
> > any objects like netty related stacks. The flink runtime can provide one
> > unified netty-based ShuffleService which can be started in both internal
> > TaskManager or external containers. The internal ShuffleService not only
> > takes the role of tranporting data directly for some ShuffleManager
> > instances but also takes the role of RPC server for communicating with
> > external ShuffleService, such as register result partition to external
> > service, otherwise the external service might need an additional RPC
> > service to contact with TaskManager. Here we have the implicit meaning to
> > make intenral shuffle as a basic service started in TaskManager like the
> > components of IOManager and MemoryManager, even thought useless for some
> > type jobs.
> > - In case of having it per job, we might need to provide compatibility
> > check between shuffle service and cluster mode (e.g. yarn ext shuffle
> > service for standalone mode cluster) if it is an issue.
> > - Having it per job feels like the same complexity as having it per
> > operator, at the first glance, just changes its granularity and where
> > objects reside.
> > - what is the problem to use cluster per job mode? Then shuffle manager per
> > cluster and per job is the same but might simplify other issues at the
> > beginning. Streaming and batch jobs with different shuffle requirements
> > could be started in different clusters per job.
> >
> > I totally agree with the above concerns for per job configuration. As you
> > mentioned, it is a option to run different type jobs in different clusters.
> > But in some special scenarios like hybrid cluster to run online and offline
> > jobs in differemt times, it is betterto support job level configuration for
> > fexibility. Certainly it may not be a strong requirements for most cases,
> > then we can reach an agreement to make the cluster level as the easiest way
> > first and adjut the level if needed in future.
> >
> > 2. ShuffleManager interface
> >
> > I think you mentioned three sub issues in this part:
> >
> > 2.1 Introduction of additional ResultPartitionWriterFactory &&
> > InputGateReaderFactory
> >
> > I am not against the introduction of these two factories. The original
> > introduction of pluggable ShuffleManager interface is for creating
> > different writer and reader sides. If the ShuffleManager interface is used
> > for creating factories, and then the factories are used for creating writer
> > and reader. I still think the essence is same, and only the form is
> > different. That is the ShuffleManager concept is seen on JobManager side,
> > and the task only sees the corresponding factories from ShuffleManager. In
> > other words, we add another factory layer to distinguish between JobManager
> > and task. The form might seem a bit better to introduce corresponding
> > factories, so I am willing to take this way for implementation.
> >
> > 2.2 Whether to retain getResultPartitionLocation method in ShuffleManager
> > interface
> >
> > If I understand correctly, you mean to put this location as an argument in
> > InputGateReaderFacotry constructor? If to do so, I think it makes sense and
> > we can avoid have this explicit method in interface. But we also need to
> > adjust the existing related process like updatePartitionInfo for downstream
> > side. In this case, the partition location is unknown during deploying
> > downstream tasks. Based on upstream's consumable notification, the location
> > update is triggered by JobManager to downstream side.
> >
> > 2.3 ShuffleService interface
> >
> > My initial thought is not making it as an interface. Because for internal
> > or external shuffle cases, they can reuse the same unified netty-based
> > shuffle service if we wrap the related componenets into current shuffle
> > service well. If we want to furtherextend other implementations of shuffle
> > service, like http-based shuffle service, then we can define an interface
> > for it, the way as current RpcService interface to get ride of only akka
> > implementations. So it also makes sense on my side to keep this interface.
> > As for ShuffleServiceRegistry class, I agree with you to have this
> > TaskManager level service for managing and sharing for all the internal
> > tasks.
> >
> > In summary, I think we do not have essential conflicts for above issues,
> > almost for the implementation aspects. And I agree with the above points,
> > especially for above 2.2 you might need double check if I understand
> > correctly.
> > Wish your further feedbacks then I can adjust the docs based on it. Also
> > welcome any other person's feedbacks!
> >
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > 发件人:Andrey Zagrebin <and...@data-artisans.com>
> > 发送时间:2018年12月10日(星期一) 05:18
> > 收件人:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski
> > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>; Till Rohrmann
> > <trohrm...@apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Hi Zhijiang,
> >
> >
> > Thanks for sharing the document Zhijiang.
> > I decided to compile my thoughts to consider here, not to overload document
> > comments any more :)
> > I think I still have question about job level configuration for the shuffle
> > service. You mentioned that we can keep several shuffle manager objects in
> > one task executor for different jobs. This is valid. My concerns are:
> > - how do we share shuffle manager resources among different job tasks
> > within one task executor process? It could be some static objects shared by
> > all shuffle manager objects of some type but it might be not scalable
> > approach. Example could be multiplexed netty connections (as I understand,
> > current netty stack can become just custom shuffle service).
> > - In case of having it per job, we might need to provide compatibility
> > check between shuffle service and cluster mode (e.g. yarn ext shuffle
> > service for standalone mode cluster) if it is an issue.
> > - Having it per job feels like the same complexity as having it per
> > operator, at the first glance, just changes its granularity and where
> > objects reside.
> > - what is the problem to use cluster per job mode? Then shuffle manager per
> > cluster and per job is the same but might simplify other issues at the
> > beginning. Streaming and batch jobs with different shuffle requirements
> > could be started in different clusters per job.
> > As for ShuffleManager interface, I think I see your point with the
> > ResultPartitionLocation. I agree that partition needs some addressing of
> > underlying connection or resources in general. It can be thinked of as an
> > argument of ShuffleManager factory methods.
> > My point is that task code might not need to be coupled to shuffle
> > interface. This way we could keep task code more independent of records
> > transfer layer. We can always change later how shuffle/network service is
> > organised internally without any consequences for the general task code. If
> > task code calls just factories provided by JM, it might not even matter for
> > the task in future whether it is configured per cluster, job or operator.
> > Internally, factory can hold location of concrete type if needed.
> > Code example could be:
> > Job Manager side:
> > interface ShuffleManager {
> > ResultPartionWriterFactory
> > createResultPartionWriterFactory(job/task/topology descriptors);
> > // similar for input gate factory
> > }
> > class ShuffleManagerImpl implements ShuffleManager {
> > private general config, services etc;
> > ResultPartionWriterFactory
> > createResultPartionWriterFactory(job/task/topology descriptors) {
> > return new ResultPartionWriterFactoryImpl(location, job, oper id, other
> > specific config etc);
> > }
> > // similar for input gate factory
> > }
> > ...
> > // somewhere in higher level code put ResultPartionWriterFactory into
> > descriptor
> > Task executor side receives the factory inside the descriptor and calls
> > factory.create(ShuffleServiceRegistry). Example of factory:
> > class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory {
> > // all fields are lightweight and serialisable, received from JM
> > private location, shuffle service id, other specific config etc;
> >
> > ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more
> > generic args) {
> > // get or create task local specific ShuffleServiceImpl by id in
> > registry
> > // ShuffleServiceImpl object can be shared between jobs
> > // register with the ShuffleServiceImpl by location, id, config etc
> > }
> > }
> > interface ShuffleService extends AutoClosable {
> > getId();
> > }
> > ShuffleServiceImpl manages resources and decides internally whether to do
> > it per task executor, task, job or operator. It can contain network stack,
> > e,g, netty connections etc. In case of external service, it can hold
> > partition manager, transport client etc. It is not enforced to have it per
> > job by this contract or even to have it at all. ShuffleServiceImpl also
> > does not need to depend on all TaskManagerServices, only create relevant
> > inside, e.g. network.
> > class ShuffleServiceRegistry {
> > <T extends ShuffleService> T getShuffleService(id);
> > registerShuffleService(ShuffleService, id);
> > deregisterShuffleService(id); // remove and close ShuffleService
> > close(); // close all
> > }
> > ShuffleServiceRegistry is just a generic container of all available
> > ShuffleService’s. It could be part of TaskManagerServices instead of
> > NetworkEnvironment which could go into specific ShuffleServiceImpl.
> >
> > I might still miss some details, I would appreciate any feedback.
> >
> > Best,
> > Andrey
> >
> > On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > wrote:
> > Hi all,
> >
> > I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow
> > down the scope of introducing pluggable shuffle manager architecture as the
> > first step.
> > Welcome further feedbacks and suggestions, then I would create specific
> > subtasks for it to forward.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10653
> >
> > [2]
> > https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
> > ------------------------------------------------------------------
> > 发件人:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> > 发送时间:2018年11月1日(星期四) 17:19
> > 收件人:dev <dev@flink.apache.org>; Jin Sun <isun...@gmail.com>
> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski
> > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>
> > 主 题:回复:[DISCUSS] Proposal of external shuffle service
> >
> > Thanks for the efficient response till!
> >
> > Thanks sunjin for the good feedbacks, we will further confirm with the
> > comments then! :)
> > ------------------------------------------------------------------
> > 发件人:Jin Sun <isun...@gmail.com>
> > 发送时间:2018年11月1日(星期四) 06:42
> > 收件人:dev <dev@flink.apache.org>
> > 抄 送:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Nico Kruber
> > <n...@data-artisans.com>; Piotr Nowojski <pi...@data-artisans.com>; Stephan
> > Ewen <se...@apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Thanks Zhijiang for the proposal. I like the idea of external shuffle
> > service, have left some comments on the document.
> >
> > On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> >
> > Thanks for the update Zhijiang! The community is currently quite busy with
> > the next Flink release. I hope that we can finish the release in two weeks.
> > After that people will become more responsive again.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang...@aliyun.com> wrote:
> >
> > I already created the umbrella jira [1] for this improvement, and attched
> > the design doc [2] in this jira.
> >
> > Welcome for further discussion about the details.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10653
> > [2]
> > https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing
> >
> >
> > <https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing>
> > Best,
> > Zhijiang
> >
> > ------------------------------------------------------------------
> > 发件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com.INVALID>
> > 发送时间:2018年9月11日(星期二) 15:21
> > 收件人:dev <dev@flink.apache.org>
> > 抄 送:dev <dev@flink.apache.org>
> > 主 题:回复:[DISCUSS] Proposal of external shuffle service
> >
> > Many thanks Till!
> >
> >
> > I would create a JIRA for this feature and design a document attched with
> > it.
> > I will let you know after ready! :)
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > 发件人:Till Rohrmann <trohrm...@apache.org>
> > 发送时间:2018年9月7日(星期五) 22:01
> > 收件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>
> > 抄 送:dev <dev@flink.apache.org>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> >
> > The rough plan sounds good Zhijiang. I think we should continue with what
> > you've proposed: Open a JIRA issue and creating a design document which
> > outlines the required changes a little bit more in detail. Once this is
> > done, we should link the design document in the JIRA issue and post it here
> > for further discussion.
> >
> > Cheers,
> > Till
> >
> > On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com> wrote:
> >
> > Glad to receive your positive feedbacks Till!
> >
> > Actually our motivation is to support batch job well as you mentioned.
> >
> > For output level, flink already has the Subpartition abstraction(writer),
> > and currently there are PipelinedSubpartition(memory output) and
> > SpillableSubpartition(one-sp-one-file output) implementations. We can
> > extend this abstraction to realize other persistent outputs (e.g.
> > sort-merge-file).
> >
> > For transport level(shuffle service), the current SubpartitionView
> > abstraction(reader) seems as the brige linked with the output level, then
> >
> > the view can understand and read the different output formats. The current
> > NetworkEnvironment seems take the role of internal shuffle service in
> > TaskManager and the transport server is realized by netty inside. This
> >
> > component can also be started in other external containers like NodeManager
> > of yarn to take the role of external shuffle service. Further we can
> >
> > abstract to extend the shuffle service for transporting outputs by http or
> >
> > rdma instead of current netty. This abstraction should provide the way for
> > output registration in order to read the results correctly, similar with
> > current SubpartitionView.
> >
> > The above is still a rough idea. Next I plan to create a feature jira to
> > cover the related changes if possible. It would be better if getting help
> > from related committers to review the detail designs together.
> >
> > Best,
> > Zhijiang
> >
> > ------------------------------------------------------------------
> > 发件人:Till Rohrmann <trohrm...@apache.org>
> > 发送时间:2018年8月29日(星期三) 17:36
> > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com>
> > 主 题:Re: [DISCUSS] Proposal of external shuffle service
> >
> > Thanks for starting this design discussion Zhijiang!
> >
> > I really like the idea to introduce a ShuffleService abstraction which
> >
> > allows to have different implementations depending on the actual use case.
> >
> > Especially for batch jobs I can clearly see the benefits of persisting the
> > results somewhere else.
> >
> > Do you already know which interfaces we need to extend and where to
> > introduce new abstractions?
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
> > <wangzhijiang...@aliyun.com.invalid> wrote:
> > Hi all!
> >
> >
> > The shuffle service is responsible for transporting upstream produced data
> > to the downstream side. In flink, the NettyServer is used for network
> >
> > transport service and this component is started in the TaskManager process.
> > That means the TaskManager can support internal shuffle service which
> > exists some concerns:
> > 1. If a task finishes, the ResultPartition of this task still retains
> > registered in TaskManager, because the output buffers have to be
> > transported by internal shuffle service in TaskManager. That means the
> > TaskManager can not be released by ResourceManager until ResultPartition
> > released. It may waste container resources and can not support well for
> > dynamic resource scenarios.
> > 2. If we want to expand another shuffle service implementation, the
> > current mechanism is not easy to handle, because the output level (result
> > partition) and transport level (shuffle service) are not divided clearly
> > and loss of abstraction to be extended.
> >
> > For above considerations, we propose the external shuffle service which
> > can be deployed on any other external contaienrs, e.g. NodeManager
> >
> > container in yarn. Then the TaskManager can be released ASAP ifneeded when
> > all the internal tasks finished. The persistent output files of these
> > finished tasks can be served to transport by external shuffle service in
> > the same machine.
> >
> > Further we can abstract both of the output level and transport level to
> >
> > support different implementations. e.g. We realized merging the data of all
> >
> > the subpartitions into limited persistent local files for disk improvements
> > in some scenarios instead of one-subpartiton-one-file.
> >
> > I know it may be a big work for doing this, and I just point out some
> > ideas, and wish getting any feedbacks from you!
> >
> > Best,
> > Zhijiang
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>