Thesharing commented on a change in pull request #14868:
URL: https://github.com/apache/flink/pull/14868#discussion_r581003377



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -90,21 +89,12 @@ void resetForNewExecution() {
         hasDataProduced = false;
     }
 
-    int addConsumerGroup() {
-        int pos = consumers.size();
-
-        // NOTE: currently we support only one consumer per result!!!
-        if (pos != 0) {
-            throw new RuntimeException(
-                    "Currently, each intermediate result can only have one 
consumer.");
-        }
-
-        consumers.add(new ArrayList<ExecutionEdge>());
-        return pos;
+    public void setConsumers(ConsumerVertexGroup consumers) {
+        
producer.getExecutionGraph().getEdgeManager().addPartitionConsumers(partitionId,
 consumers);

Review comment:
       Sorry for being careless. Done.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
##########
@@ -90,21 +89,12 @@ void resetForNewExecution() {
         hasDataProduced = false;
     }
 
-    int addConsumerGroup() {
-        int pos = consumers.size();
-
-        // NOTE: currently we support only one consumer per result!!!
-        if (pos != 0) {
-            throw new RuntimeException(
-                    "Currently, each intermediate result can only have one 
consumer.");
-        }
-
-        consumers.add(new ArrayList<ExecutionEdge>());
-        return pos;
+    public void setConsumers(ConsumerVertexGroup consumers) {
+        
producer.getExecutionGraph().getEdgeManager().addPartitionConsumers(partitionId,
 consumers);
     }
 
-    void addConsumer(ExecutionEdge edge, int consumerNumber) {
-        consumers.get(consumerNumber).add(edge);
+    EdgeManager getEdgeManager() {

Review comment:
       Yes, it should be private. Thanks for point this out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to