azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r256595763
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ########## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional<ConnectionID> connectionId; Review comment: ok, I think I see the problem now, thanks for explanation. I will put my thoughts in other order :) 2. During the design, I thought `ShuffleDeploymentDescriptor` was supposed to contain shuffle specific info generated by `ShuffleMaster` as a central point and used eventually by `ShuffleService` in producer and consumer Task to setup readers/writers. The example could be some partition identification or connection inside external shuffle system. The existing connection id/location is also an example of it for the existing netty stack, but might be not relevant for other shuffle systems. For example, let's say the partition is stored remotely (not in producer), the batch job is restored and some the partition is finished, we do not even need to deploy the producer, just connect the consumer to the existing 'done' external partition, then the existing connection id does not make sense, the consumer needs some kind of internal shuffle id of the partition. That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> `ShuffleMaster` -> SDD(Internal) -> ICDD(SDD) -> Task -> ICDD,ConsumerResourceId -> `ShuffleService` -> InputGate -> read records. I think even `ShuffleService` itself can decide what to do with ProducerResourceId/ConsumerResourceId and calculate internally LocationType in case of existing netty. For other shuffle services, LocationType might be not relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we could leave only one of them, not sure. I thought of `UnknownShuffleDeploymentDescriptor` as a replacement of `LocationType.Unknown\ConnectionId=null` based on the above arguments. It is just a singleton stub to signal that SDD will be updated later with the `sendUpdatePartitionInfoRpcCall` in case of lazy scheduling. True, it is not generated by `ShuffleMaster`, what could be an alternative for this approach? 1. In case of eager deployment (lazyScheduling=false), currently, we can already deploy the consumer when the slot is assigned to the producer but its deployment has not started yet and we planned to generate the SDD during producer deployment. If we agree on 2., it seems that we need SDD for consumer to consume and it has to be known. Thinking more about `ShuffleMaster` interface, depending on its nature, it might be an asynchronous API like registering and talking to an external system. This means that ideally its partition register method should return a `CompletableFuture<SDD>`. Then the producer execution life cycle should be: created -> scheduled -> slot assigned -> register partition (get and cache SDD) -> deploying (generate TDD with previously acquired SDD). Everything happening on the main thread of Job Master. The consumer has to be deployed not after producer slot is assigned but after partition is registered in eager scheduling. In lazy scheduling, we have the `sendUpdatePartitionInfoRpcCall` to send SDD later. I would suggest we do the partitions registering and SDD caching in `allocateAndAssignSlotForExecution`, right after slot assignment (needs rebase on the latest master): ``` return FutureUtils.handleAsyncIfNotDone(..tryAssignResource..) .thenComposeAsync( slot -> {..ShuffleMaster.register(PSD), cache SDDs..}, mainThreadExecutor); ``` Just maybe with refactoring the steps into different functions :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services