[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544257#comment-16544257 ]
ASF GitHub Bot commented on FLINK-9489: --------------------------------------- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202518473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe } @SuppressWarnings("unchecked") - public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { + public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), - (TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); + (TypeSerializer<K>) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), + (TypeSerializer<V>) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ - public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() { - return new RegisteredBroadcastBackendStateMetaInfo<>(this); + @Nonnull + public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() { + return new RegisteredBroadcastStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { - Map<String, String> optionsMap = Collections.singletonMap( - StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), - assignmentMode.toString()); - Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2); - Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2); - String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); - String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); - serializerMap.put(keySerializerKey, keySerializer.duplicate()); - serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); - serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); - serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); - - return new StateMetaInfoSnapshot( - name, - StateMetaInfoSnapshot.BackendStateType.BROADCAST, - optionsMap, - serializerConfigSnapshotsMap, - serializerMap); + if (precomputedSnapshot == null) { + precomputedSnapshot = precomputeSnapshot(); + } + return precomputedSnapshot; --- End diff -- What if the serializers are not all immutable? Should we need a `immutable` field for it? Only when it is true we return the `precomputeSnapshot`. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --------------------------------------------------------------------------- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)