StefanRRichter commented on code in PR #25263:
URL: https://github.com/apache/flink/pull/25263#discussion_r1736431329


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java:
##########
@@ -119,14 +122,18 @@ public static void registerKvStateInformation(
      *
      * <p>Creates the column family for the state. Sets TTL compaction filter 
if {@code
      * ttlCompactFiltersManager} is not {@code null}.
+     *
+     * @param importFilesMetaData if not empty, we import the files specified 
in the metadata to the
+     *     column family.
      */
     public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
             RegisteredStateMetaInfoBase metaInfoBase,
             RocksDB db,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nullable Long writeBufferManagerCapacity,
-            ICloseableRegistry cancelStreamRegistryForRestore) {
+            ICloseableRegistry cancelStreamRegistryForRestore,
+            List<ExportImportFilesMetaData> importFilesMetaData) {

Review Comment:
   nit: please change the order of parameters (first the list, then 
ICloseableRegistry) so that it's consistent with the order of 
`createColumnFamily`.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -653,10 +650,10 @@ private void initBaseDBFromColumnFamilyImports(
                 keyGroupRange,
                 operatorIdentifier);
         rocksHandle.openDB();
-        for (Map.Entry<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>> entry :
+        for (Map.Entry<RegisteredStateMetaInfoBase, 
List<ExportImportFilesMetaData>> entry :

Review Comment:
   Discussed offline with @rkhachatryan and we are ok with proceeding with this 
PR first and then we'll just re-apply all required code for the bugfixes.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -653,10 +650,10 @@ private void initBaseDBFromColumnFamilyImports(
                 keyGroupRange,
                 operatorIdentifier);
         rocksHandle.openDB();
-        for (Map.Entry<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>> entry :
+        for (Map.Entry<RegisteredStateMetaInfoBase, 
List<ExportImportFilesMetaData>> entry :

Review Comment:
   @rkhachatryan this line looks like it's undoing a change that you introduced 
when porting ClosableRegistry changes. In the long term, we still want that 
version because the `RegisteredStateMetaInfoBase.Key` comes from a fix that I 
committed. So I wonder what's the best order and strategy? Should we stop doing 
this uncommenting and rather create a branch that first applies the remaining 
bugfixes and THEN uncomments, or do we start by uncommenting even though it 
potentially undoes changes like this that we need to redo again. I almost think 
the first approach is better, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to