pnowojski commented on code in PR #27602:
URL: https://github.com/apache/flink/pull/27602#discussion_r2813416926


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -158,19 +173,23 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                 new ValueStateDescriptor<>("current-value", serializer);
         this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
 
-        if (context.isRestored()) {
-            // Detect ordered state backend before consolidation
-            detectOrderedStateBackend();
-
-            // Consolidate all buffered records to MIN_VALUE for each key.
-            // This ensures they are compacted on the first watermark after 
restore.
-            getKeyedStateBackend()
-                    .applyToAllKeys(
-                            VoidNamespace.INSTANCE,
-                            VoidNamespaceSerializer.INSTANCE,
-                            bufferDescriptor,
-                            (key, state) -> consolidateBufferToMinValue());
-        }
+        this.consolidatedDescriptor = new 
ValueStateDescriptor<>("consolidated", Boolean.class);
+        this.consolidated = 
context.getKeyedStateStore().getState(consolidatedDescriptor);
+
+        this.restoredFromCheckpoint = context.isRestored();
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        // Clear consolidation flags before checkpoint - they should not be 
persisted
+        getKeyedStateBackend()
+                .applyToAllKeys(
+                        VoidNamespace.INSTANCE,
+                        VoidNamespaceSerializer.INSTANCE,
+                        consolidatedDescriptor,
+                        (key, state) -> state.clear());
+
+        super.snapshotState(context);

Review Comment:
   Why do we need this? 🤔 With frequent checkpointing this will become a 
bottleneck/performance issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to