zhijiangW commented on a change in pull request #7549: [FLINK-11403][network] Remove ResultPartitionConsumableNotifier from ResultPartition URL: https://github.com/apache/flink/pull/7549#discussion_r267201040
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ########## @@ -19,432 +19,105 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; -import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; -import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.taskmanager.TaskManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkElementIndex; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** - * A result partition for data produced by a single task. - * - * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, - * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one - * or more {@link ResultSubpartition} instances, which further partition the data depending on the - * number of consuming tasks and the data {@link DistributionPattern}. - * - * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request - * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) - * - * <h2>Life-cycle</h2> - * - * <p>The life-cycle of each result partition has three (possibly overlapping) phases: - * <ol> - * <li><strong>Produce</strong>: </li> - * <li><strong>Consume</strong>: </li> - * <li><strong>Release</strong>: </li> - * </ol> - * - * <h2>Lazy deployment and updates of consuming tasks</h2> - * - * <p>Before a consuming task can request the result, it has to be deployed. The time of deployment - * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined - * results, receivers are deployed as soon as the first buffer is added to the result partition. - * With blocking results on the other hand, receivers are deployed after the partition is finished. - * - * <h2>Buffer management</h2> - * - * <h2>State management</h2> + * A wrapper of result partition writer for handling notification of the consumable + * partition which is added a {@link BufferConsumer} or finished. */ -public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { - - private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); - - private final String owningTaskName; +public class ResultPartition implements ResultPartitionWriter { private final TaskActions taskActions; private final JobID jobId; - private final ResultPartitionID partitionId; - - /** Type of this partition. Defines the concrete subpartition implementation to use. */ private final ResultPartitionType partitionType; - /** The subpartitions of this partition. At least one. */ - private final ResultSubpartition[] subpartitions; - - private final ResultPartitionManager partitionManager; + private final ResultPartitionWriter partitionWriter; Review comment: In the final form, `ResultPartition` should not implement `ResultPartitionWriter` and it could be put in the `TaskManager` package instead of current network package. And the field in `ResultPartition` should be regular `ResultPartitionWriter` finally. I could understand your concern. If we keep special `NetworkResultPartition` in `ResultPartition` temporarily, it would bring other troubles after I have a try. The mainly involved ones are `Environment` and `RecordWriter`. - `RecordWriter` -> `ResultPartition` -> `NetworkResultPartition`. There are many other implementations for `ResultPartitionWriter` used for tests. In `RecordWriterTest` we could not construct the proper `RecordWriter` based on `ResultPartition` which contains specific `NetworkResultPartition`. - `Environment` should keep `ResultPartition` instead of `ResultPartitionWriter`, because it would be used for constructing `RecordWriter` later. If we keep specific `NetworkResultPartition` in `ResultPartition`, then it could not return proper `ResultPartition` for other `Environment` implementations, such as `StreamMockEnvironment`, `MockEnvironment` etc, because they maintain other `ResultPartitionWriter` implementations. So I think the current way is more easy to handle, because we only need refactor `ResultPartition` and `Task` finally. Another option is we suspend this PR currently, you could firstly review other PRs which are involved in `ResultPartitionWriter` interface refactor. After we make a final interface form, then we could come back to maintain `ResultPartitionWriter` in `ResultPartition` finally. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services