[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099873#comment-16099873 ]
ASF GitHub Bot commented on FLINK-7213: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129277176 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -867,81 +845,60 @@ public String toString() { AsyncCheckpointRunnable( StreamTask<?, ?> owner, - List<StreamStateHandle> nonPartitionedStateHandles, - List<OperatorSnapshotResult> snapshotInProgressList, + Map<OperatorID, StreamStateHandle> nonPartitionedStateHandles, + Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long asyncStartNanos) { this.owner = Preconditions.checkNotNull(owner); - this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList); + this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.nonPartitionedStateHandles = nonPartitionedStateHandles; this.asyncStartNanos = asyncStartNanos; - - if (!snapshotInProgressList.isEmpty()) { - // TODO Currently only the head operator of a chain can have keyed state, so simply access it directly. - int headIndex = snapshotInProgressList.size() - 1; - OperatorSnapshotResult snapshotInProgress = snapshotInProgressList.get(headIndex); - if (null != snapshotInProgress) { - this.futureKeyedBackendStateHandles = snapshotInProgress.getKeyedStateManagedFuture(); - this.futureKeyedStreamStateHandles = snapshotInProgress.getKeyedStateRawFuture(); - } - } } @Override public void run() { FileSystemSafetyNet.initializeSafetyNetForThread(); try { - // Keyed state handle future, currently only one (the head) operator can have this - KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); - KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); - - List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size()); - List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size()); - - for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) { - if (null != snapshotInProgress) { - operatorStatesBackend.add( - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture())); - operatorStatesStream.add( - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture())); - } else { - operatorStatesBackend.add(null); - operatorStatesStream.add(null); - } - } + boolean hasState = false; + final TaskStateSnapshot taskOperatorSubtaskStates = + new TaskStateSnapshot(operatorSnapshotsInProgress.size()); - final long asyncEndNanos = System.nanoTime(); - final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000; + for (Map.Entry<OperatorID, OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) { - checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); + OperatorID operatorID = entry.getKey(); + OperatorSnapshotResult snapshotInProgress = entry.getValue(); - ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState = - new ChainedStateHandle<>(nonPartitionedStateHandles); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( + nonPartitionedStateHandles.get(operatorID), + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()), + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()), + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()), + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture()) + ); - ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend = - new ChainedStateHandle<>(operatorStatesBackend); + hasState |= operatorSubtaskState.hasState(); + taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + } - ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream = - new ChainedStateHandle<>(operatorStatesStream); + final long asyncEndNanos = System.nanoTime(); + final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000; - SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles( - chainedNonPartitionedOperatorsState, - chainedOperatorStateBackend, - chainedOperatorStateStream, - keyedStateHandleBackend, - keyedStateHandleStream); + checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + // we signal a stateless task by reporting null, so that there are no attempts to assign empty state + // to stateless tasks on restore. This enables simple job modifications that only concern + // stateless without the need to assign them uids to match their (always empty) states. --- End diff -- stateless **tasks** > Introduce state management by OperatorID in TaskManager > ------------------------------------------------------- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.4.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)