Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858603609


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   Can we move the physicalStateHandleId as a private field? It will be `null` 
by default, and initialize on the 1st access. This is the same for 
`ByteStreamStateHandle`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+import java.util.UUID;
+
+/**
+ * Unique ID that allows for physical comparison between state handles.
+ *
+ * <p>Different state objects (e.g. different files) representing the same 
piece of data must have
+ * different IDs (e.g. file names). This is different from {@link
+ * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which 
returns the same ID.
+ *
+ * @see StateHandleID
+ */
+public class PhysicalStateHandleID extends StringBasedID {
+
+    private static final long serialVersionUID = 1L;
+
+    public PhysicalStateHandleID(String keyString) {
+        super(keyString);
+    }
+
+    public static PhysicalStateHandleID randomStateHandleId() {

Review Comment:
   Is this method ever used?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+class ChangelogTruncateHelper {

Review Comment:
   Can we add some docs for this class and related methods? To be honest, with 
this class introduced, we can have simple public API. However, we lack the 
information when and why these methods are called. As these methods are only 
called within `ChangelogKeyedStateBackend`, maybe we just put them back to 
`ChangelogKeyedStateBackend` could achieve clear logic for others to understand.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new 
ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new 
CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", 
handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   The notification of checkpoint complete cannot alaways be guranteed, is it 
always safe here to call this method considering `stopTracking` missed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java:
##########
@@ -73,12 +80,19 @@
      */
     void reset(SequenceNumber from, SequenceNumber to);
 
+    /**
+     * Truncate the tail of log and close it. No new appends will be possible. 
Any appended but not
+     * persisted records will be lost.
+     *
+     * @param from {@link SequenceNumber} from which to truncate the 
changelog, inclusive
+     */
+    default void truncateAndClose(SequenceNumber from) {

Review Comment:
   The `StateChangelogWriter` class is an internal class, do we still need to 
introduce a default interface here?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter 
StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage 
StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   Actually, `ChangelogRegistry` is only used for changelogs which has not been 
confirmed in checkpoints. However, current name and API lacks of such 
information.
   How about rename this class to `UnconfirmedChaneglogRegistry`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new 
ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new 
CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", 
handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {
+        PhysicalStateHandleID key = handle.getStreamStateHandleID();
+        LOG.debug("backend {} not using state, key: {}, state: {}", backendId, 
key, handle);
+        Set<UUID> backends = entries.get(key);
+        if (backends == null) {
+            LOG.warn("backend {} was not using state, key: {}, state: {}", 
backendId, key, handle);

Review Comment:
   What's the difference of this log statement compared with the above debug 
statement?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter 
StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage 
StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {
+
+    /** Start tracking the state uploaded for the given backends. */
+    void startTracking(StreamStateHandle handle, Set<UUID> backendIDs);
+
+    /** Stop tracking the state, so that it's not tracked (some other 
component is doing that). */
+    void stopTracking(StreamStateHandle handle);
+
+    /**
+     * Mark the state as unused by the given backend, e.g. if it was 
pre-emptively uploaded and
+     * materialized. Once no backend is using the state, it is discarded 
(unless it was {@link
+     * #stopTracking(StreamStateHandle) unregistered} earlier).
+     */
+    void notUsed(StreamStateHandle handle, UUID backendId);
+
+    ChangelogRegistry NO_OP =
+            new ChangelogRegistry() {
+                @Override
+                public void startTracking(StreamStateHandle handle, Set<UUID> 
backendIDs) {}
+
+                @Override
+                public void stopTracking(StreamStateHandle handle) {}
+
+                @Override
+                public void notUsed(StreamStateHandle handle, UUID backendId) 
{}
+            };
+
+    static ChangelogRegistry defaultChangelogRegistry(int 
numAsyncDiscardThreads) {
+        return 
defaultChangelogRegistry(Executors.newFixedThreadPool(numAsyncDiscardThreads));
+    }
+
+    static ChangelogRegistry defaultChangelogRegistry(Executor executor) {

Review Comment:
   Should add `@VisibleForTesting`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -270,7 +286,25 @@ public void truncate(SequenceNumber to) {
         checkArgument(to.compareTo(activeSequenceNumber) <= 0);
         lowestSequenceNumber = to;
         notUploaded.headMap(lowestSequenceNumber, false).clear();
-        uploaded.headMap(lowestSequenceNumber, false).clear();
+
+        Map<SequenceNumber, UploadResult> toDiscard = uploaded.headMap(to);
+        LOG.trace("Uploaded state to discard: {}", toDiscard);
+        for (UploadResult result : toDiscard.values()) {
+            changelogRegistry.notUsed(result.streamStateHandle, logId);
+        }
+        toDiscard.clear();

Review Comment:
   We can introduce a method to be reused in `truncate` and `truncateAndClose`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to