[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099873#comment-16099873
 ] 

ASF GitHub Bot commented on FLINK-7213:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129277176
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -867,81 +845,60 @@ public String toString() {
     
                AsyncCheckpointRunnable(
                                StreamTask<?, ?> owner,
    -                           List<StreamStateHandle> 
nonPartitionedStateHandles,
    -                           List<OperatorSnapshotResult> 
snapshotInProgressList,
    +                           Map<OperatorID, StreamStateHandle> 
nonPartitionedStateHandles,
    +                           Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
                                CheckpointMetaData checkpointMetaData,
                                CheckpointMetrics checkpointMetrics,
                                long asyncStartNanos) {
     
                        this.owner = Preconditions.checkNotNull(owner);
    -                   this.snapshotInProgressList = 
Preconditions.checkNotNull(snapshotInProgressList);
    +                   this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.nonPartitionedStateHandles = 
nonPartitionedStateHandles;
                        this.asyncStartNanos = asyncStartNanos;
    -
    -                   if (!snapshotInProgressList.isEmpty()) {
    -                           // TODO Currently only the head operator of a 
chain can have keyed state, so simply access it directly.
    -                           int headIndex = snapshotInProgressList.size() - 
1;
    -                           OperatorSnapshotResult snapshotInProgress = 
snapshotInProgressList.get(headIndex);
    -                           if (null != snapshotInProgress) {
    -                                   this.futureKeyedBackendStateHandles = 
snapshotInProgress.getKeyedStateManagedFuture();
    -                                   this.futureKeyedStreamStateHandles = 
snapshotInProgress.getKeyedStateRawFuture();
    -                           }
    -                   }
                }
     
                @Override
                public void run() {
                        FileSystemSafetyNet.initializeSafetyNetForThread();
                        try {
    -                           // Keyed state handle future, currently only 
one (the head) operator can have this
    -                           KeyedStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
    -                           KeyedStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
    -
    -                           List<OperatorStateHandle> operatorStatesBackend 
= new ArrayList<>(snapshotInProgressList.size());
    -                           List<OperatorStateHandle> operatorStatesStream 
= new ArrayList<>(snapshotInProgressList.size());
    -
    -                           for (OperatorSnapshotResult snapshotInProgress 
: snapshotInProgressList) {
    -                                   if (null != snapshotInProgress) {
    -                                           operatorStatesBackend.add(
    -                                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
    -                                           operatorStatesStream.add(
    -                                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
    -                                   } else {
    -                                           operatorStatesBackend.add(null);
    -                                           operatorStatesStream.add(null);
    -                                   }
    -                           }
    +                           boolean hasState = false;
    +                           final TaskStateSnapshot 
taskOperatorSubtaskStates =
    +                                   new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
     
    -                           final long asyncEndNanos = System.nanoTime();
    -                           final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
    +                           for (Map.Entry<OperatorID, 
OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) {
     
    -                           
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
    +                                   OperatorID operatorID = entry.getKey();
    +                                   OperatorSnapshotResult 
snapshotInProgress = entry.getValue();
     
    -                           ChainedStateHandle<StreamStateHandle> 
chainedNonPartitionedOperatorsState =
    -                                           new 
ChainedStateHandle<>(nonPartitionedStateHandles);
    +                                   OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
    +                                           
nonPartitionedStateHandles.get(operatorID),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture())
    +                                   );
     
    -                           ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateBackend =
    -                                           new 
ChainedStateHandle<>(operatorStatesBackend);
    +                                   hasState |= 
operatorSubtaskState.hasState();
    +                                   
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
    +                           }
     
    -                           ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateStream =
    -                                           new 
ChainedStateHandle<>(operatorStatesStream);
    +                           final long asyncEndNanos = System.nanoTime();
    +                           final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
     
    -                           SubtaskState subtaskState = 
createSubtaskStateFromSnapshotStateHandles(
    -                                           
chainedNonPartitionedOperatorsState,
    -                                           chainedOperatorStateBackend,
    -                                           chainedOperatorStateStream,
    -                                           keyedStateHandleBackend,
    -                                           keyedStateHandleStream);
    +                           
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
     
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                                
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
     
    +                                   // we signal a stateless task by 
reporting null, so that there are no attempts to assign empty state
    +                                   // to stateless tasks on restore. This 
enables simple job modifications that only concern
    +                                   // stateless without the need to assign 
them uids to match their (always empty) states.
    --- End diff --
    
    stateless **tasks**


> Introduce state management by OperatorID in TaskManager
> -------------------------------------------------------
>
>                 Key: FLINK-7213
>                 URL: https://issues.apache.org/jira/browse/FLINK-7213
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.4.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to