azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r256595763
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ##########
 @@ -52,30 +52,38 @@
        /** The ID of the partition the input channel is going to consume. */
        private final ResultPartitionID consumedPartitionId;
 
-       /** The location of the partition the input channel is going to 
consume. */
-       private final ResultPartitionLocation consumedPartitionLocation;
+       /** The location type of the partition the input channel is going to 
consume. */
+       private final LocationType locationType;
+
+       /** The connection to use to request the remote partition. */
+       private final Optional<ConnectionID> connectionId;
 
 Review comment:
   ok, I think I see the problem now, thanks for explanation. I will put my 
thoughts in other order :)
   
   2. 
   
   During the design, I thought `ShuffleDeploymentDescriptor` was supposed to 
contain shuffle specific info generated by `ShuffleMaster` as a central point 
and used eventually by `ShuffleService` in producer and consumer Task to setup 
readers/writers. 
   
   The example could be some partition identification or connection inside 
external shuffle system. The existing connection id/location is also an example 
of it for the existing netty stack, but might be not relevant for other shuffle 
systems.
   
   For example, let's say the partition is stored remotely (not in producer), 
the batch job is restored and some the partition is finished, we do not even 
need to deploy the producer, just connect the consumer to the existing 'done' 
external partition, then the existing connection id does not make sense, the 
consumer needs some kind of internal shuffle id of the partition.
   
   That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> 
`ShuffleMaster` -> SDD(Internal) -> ICDD(SDD) -> Task -> 
ICDD,ConsumerResourceId -> `ShuffleService` -> InputGate -> read records. 
   
   I think even `ShuffleService` itself can decide what to do with 
ProducerResourceId/ConsumerResourceId and calculate internally LocationType in 
case of existing netty. For other shuffle services, LocationType might be not 
relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we 
could leave only one of them, not sure.
   
   I thought of `UnknownShuffleDeploymentDescriptor` as a replacement of 
`LocationType.Unknown\ConnectionId=null` based on the above arguments. It is 
just a singleton stub to signal that SDD will be updated later with the 
`sendUpdatePartitionInfoRpcCall` in case of lazy scheduling. True, it is not 
generated by `ShuffleMaster`, what could be an alternative for this approach?
   
   1. 
   
   In case of eager deployment (lazyScheduling=false), currently, we can 
already deploy the consumer when the slot is assigned to the producer but its 
deployment has not started yet and we planned to generate the SDD during 
producer deployment. If we agree on 2., it seems that we need SDD for consumer 
to consume and it has to be known.
   
   Thinking more about `ShuffleMaster` interface, depending on its nature, it 
might be an asynchronous API like registering and talking to an external 
system. This means that ideally its partition register method should return a 
`CompletableFuture<SDD>`. 
   
   Then the producer execution life cycle should be: created -> scheduled -> 
slot assigned -> register partition (get and cache SDD) -> deploying (generate 
TDD with previously acquired SDD). Everything happening on the main thread of 
Job Master. The consumer has to be deployed not after producer slot is assigned 
but after partition is registered in eager scheduling. In lazy scheduling, we 
have the `sendUpdatePartitionInfoRpcCall` to send SDD later.
   
    I would suggest we do the partitions registering and SDD caching in 
`allocateAndAssignSlotForExecution`, right after slot assignment: 
   ```
   return logicalSlotFuture
     .thenApply(..tryAssignResource..)
     .thenCompose(..ShuffleMaster.register(PSD), cache SDDs..);
   ```
   Just maybe with refactoring the steps into different functions :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to