hejufang commented on code in PR #25837:
URL: https://github.com/apache/flink/pull/25837#discussion_r2009127739


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java:
##########
@@ -33,133 +37,223 @@
  * @param <UK> Type of the user entry key of state
  * @param <UV> Type of the user entry value of state
  */
-class LatencyTrackingMapState<K, N, UK, UV>
-        extends AbstractLatencyTrackState<
+class MetricsTrackingMapState<K, N, UK, UV>
+        extends AbstractMetricsTrackState<
                 K,
                 N,
                 Map<UK, UV>,
                 InternalMapState<K, N, UK, UV>,
-                LatencyTrackingMapState.MapStateLatencyMetrics>
+                MetricsTrackingMapState.MapStateMetrics>
         implements InternalMapState<K, N, UK, UV> {
-    LatencyTrackingMapState(
+
+    private TypeSerializer<UK> userKeySerializer;
+    private TypeSerializer<UV> userValueSerializer;
+
+    MetricsTrackingMapState(
             String stateName,
             InternalMapState<K, N, UK, UV> original,
-            LatencyTrackingStateConfig latencyTrackingStateConfig) {
+            KeyedStateBackend<K> keyedStateBackend,
+            LatencyTrackingStateConfig latencyTrackingStateConfig,
+            SizeTrackingStateConfig sizeTrackingStateConfig) {
         super(
                 original,
-                new MapStateLatencyMetrics(
-                        stateName,
-                        latencyTrackingStateConfig.getMetricGroup(),
-                        latencyTrackingStateConfig.getSampleInterval(),
-                        latencyTrackingStateConfig.getHistorySize(),
-                        latencyTrackingStateConfig.isStateNameAsVariable()));
+                keyedStateBackend,
+                latencyTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                latencyTrackingStateConfig.getMetricGroup(),
+                                latencyTrackingStateConfig.getSampleInterval(),
+                                latencyTrackingStateConfig.getHistorySize(),
+                                
latencyTrackingStateConfig.isStateNameAsVariable())
+                        : null,
+                sizeTrackingStateConfig.isEnabled()
+                        ? new MapStateMetrics(
+                                stateName,
+                                sizeTrackingStateConfig.getMetricGroup(),
+                                sizeTrackingStateConfig.getSampleInterval(),
+                                sizeTrackingStateConfig.getHistorySize(),
+                                
sizeTrackingStateConfig.isStateNameAsVariable())
+                        : null);
+        if (valueSerializer != null) {
+            MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, 
UV>) valueSerializer;
+            userKeySerializer = castedMapSerializer.getKeySerializer();
+            userValueSerializer = castedMapSerializer.getValueSerializer();
+        }
     }
 
     @Override
     public UV get(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnGet()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnGet()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, 
sizeOfUserValue(original.get(key)));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnGet()) {
             return trackLatencyWithException(
-                    () -> original.get(key), 
MapStateLatencyMetrics.MAP_STATE_GET_LATENCY);
+                    () -> original.get(key), 
MapStateMetrics.MAP_STATE_GET_LATENCY);
         } else {
             return original.get(key);
         }
     }
 
     @Override
     public void put(UK key, UV value) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPut()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPut()) {
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, 
sizeOfKeyAndUserKey(key));
+            sizeTrackingStateMetric.updateMetrics(
+                    MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, 
sizeOfUserValue(value));
+        }
+        if (latencyTrackingStateMetric != null && 
latencyTrackingStateMetric.trackLatencyOnPut()) {
             trackLatencyWithException(
-                    () -> original.put(key, value), 
MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY);
+                    () -> original.put(key, value), 
MapStateMetrics.MAP_STATE_PUT_LATENCY);
         } else {
             original.put(key, value);
         }
     }
 
     @Override
     public void putAll(Map<UK, UV> map) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnPutAll()) {
+        if (sizeTrackingStateMetric != null && 
sizeTrackingStateMetric.trackLatencyOnPutAll()) {
+            for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_KEY_SIZE,
+                        sizeOfKeyAndUserKey(entry.getKey()));
+                sizeTrackingStateMetric.updateMetrics(
+                        MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE,
+                        sizeOfUserValue(entry.getValue()));
+            }
+        }
+
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnPutAll()) {
             trackLatencyWithException(
-                    () -> original.putAll(map), 
MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY);
+                    () -> original.putAll(map), 
MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY);
         } else {
             original.putAll(map);
         }
     }
 
     @Override
     public void remove(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnRemove()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnRemove()) {
             trackLatencyWithException(
-                    () -> original.remove(key), 
MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY);
+                    () -> original.remove(key), 
MapStateMetrics.MAP_STATE_REMOVE_LATENCY);
         } else {
             original.remove(key);
         }
     }
 
     @Override
     public boolean contains(UK key) throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnContains()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnContains()) {
             return trackLatencyWithException(
-                    () -> original.contains(key),
-                    MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY);
+                    () -> original.contains(key), 
MapStateMetrics.MAP_STATE_CONTAINS_LATENCY);
         } else {
             return original.contains(key);
         }
     }
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.entries()),
-                    MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.entries());
         }
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
+        if (latencyTrackingStateMetric != null
+                && latencyTrackingStateMetric.trackLatencyOnKeysInit()) {
             return trackLatencyWithException(
                     () -> new IterableWrapper<>(original.keys()),
-                    MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY);
+                    MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY);
         } else {
             return new IterableWrapper<>(original.keys());
         }
     }
 
     @Override
     public Iterable<UV> values() throws Exception {

Review Comment:
   If calculate all the values returned by this method, I believe the overhead 
might be quite significant. Therefore, I added sampling logic in the 
Iterator.next method, ensuring that all data traversal operations using the 
Iterator will be included in the statistics. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to