hejufang commented on code in PR #25837: URL: https://github.com/apache/flink/pull/25837#discussion_r2009127739
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapState.java: ########## @@ -33,133 +37,223 @@ * @param <UK> Type of the user entry key of state * @param <UV> Type of the user entry value of state */ -class LatencyTrackingMapState<K, N, UK, UV> - extends AbstractLatencyTrackState< +class MetricsTrackingMapState<K, N, UK, UV> + extends AbstractMetricsTrackState< K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>, - LatencyTrackingMapState.MapStateLatencyMetrics> + MetricsTrackingMapState.MapStateMetrics> implements InternalMapState<K, N, UK, UV> { - LatencyTrackingMapState( + + private TypeSerializer<UK> userKeySerializer; + private TypeSerializer<UV> userValueSerializer; + + MetricsTrackingMapState( String stateName, InternalMapState<K, N, UK, UV> original, - LatencyTrackingStateConfig latencyTrackingStateConfig) { + KeyedStateBackend<K> keyedStateBackend, + LatencyTrackingStateConfig latencyTrackingStateConfig, + SizeTrackingStateConfig sizeTrackingStateConfig) { super( original, - new MapStateLatencyMetrics( - stateName, - latencyTrackingStateConfig.getMetricGroup(), - latencyTrackingStateConfig.getSampleInterval(), - latencyTrackingStateConfig.getHistorySize(), - latencyTrackingStateConfig.isStateNameAsVariable())); + keyedStateBackend, + latencyTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + latencyTrackingStateConfig.getMetricGroup(), + latencyTrackingStateConfig.getSampleInterval(), + latencyTrackingStateConfig.getHistorySize(), + latencyTrackingStateConfig.isStateNameAsVariable()) + : null, + sizeTrackingStateConfig.isEnabled() + ? new MapStateMetrics( + stateName, + sizeTrackingStateConfig.getMetricGroup(), + sizeTrackingStateConfig.getSampleInterval(), + sizeTrackingStateConfig.getHistorySize(), + sizeTrackingStateConfig.isStateNameAsVariable()) + : null); + if (valueSerializer != null) { + MapSerializer<UK, UV> castedMapSerializer = (MapSerializer<UK, UV>) valueSerializer; + userKeySerializer = castedMapSerializer.getKeySerializer(); + userValueSerializer = castedMapSerializer.getValueSerializer(); + } } @Override public UV get(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnGet()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnGet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_GET_VALUE_SIZE, sizeOfUserValue(original.get(key))); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnGet()) { return trackLatencyWithException( - () -> original.get(key), MapStateLatencyMetrics.MAP_STATE_GET_LATENCY); + () -> original.get(key), MapStateMetrics.MAP_STATE_GET_LATENCY); } else { return original.get(key); } } @Override public void put(UK key, UV value) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPut()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPut()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, sizeOfKeyAndUserKey(key)); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, sizeOfUserValue(value)); + } + if (latencyTrackingStateMetric != null && latencyTrackingStateMetric.trackLatencyOnPut()) { trackLatencyWithException( - () -> original.put(key, value), MapStateLatencyMetrics.MAP_STATE_PUT_LATENCY); + () -> original.put(key, value), MapStateMetrics.MAP_STATE_PUT_LATENCY); } else { original.put(key, value); } } @Override public void putAll(Map<UK, UV> map) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnPutAll()) { + if (sizeTrackingStateMetric != null && sizeTrackingStateMetric.trackLatencyOnPutAll()) { + for (Map.Entry<UK, UV> entry : map.entrySet()) { + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_KEY_SIZE, + sizeOfKeyAndUserKey(entry.getKey())); + sizeTrackingStateMetric.updateMetrics( + MapStateMetrics.MAP_STATE_PUT_VALUE_SIZE, + sizeOfUserValue(entry.getValue())); + } + } + + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnPutAll()) { trackLatencyWithException( - () -> original.putAll(map), MapStateLatencyMetrics.MAP_STATE_PUT_ALL_LATENCY); + () -> original.putAll(map), MapStateMetrics.MAP_STATE_PUT_ALL_LATENCY); } else { original.putAll(map); } } @Override public void remove(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnRemove()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnRemove()) { trackLatencyWithException( - () -> original.remove(key), MapStateLatencyMetrics.MAP_STATE_REMOVE_LATENCY); + () -> original.remove(key), MapStateMetrics.MAP_STATE_REMOVE_LATENCY); } else { original.remove(key); } } @Override public boolean contains(UK key) throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnContains()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnContains()) { return trackLatencyWithException( - () -> original.contains(key), - MapStateLatencyMetrics.MAP_STATE_CONTAINS_LATENCY); + () -> original.contains(key), MapStateMetrics.MAP_STATE_CONTAINS_LATENCY); } else { return original.contains(key); } } @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnEntriesInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.entries()), - MapStateLatencyMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); + MapStateMetrics.MAP_STATE_ENTRIES_INIT_LATENCY); } else { return new IterableWrapper<>(original.entries()); } } @Override public Iterable<UK> keys() throws Exception { - if (latencyTrackingStateMetric.trackLatencyOnKeysInit()) { + if (latencyTrackingStateMetric != null + && latencyTrackingStateMetric.trackLatencyOnKeysInit()) { return trackLatencyWithException( () -> new IterableWrapper<>(original.keys()), - MapStateLatencyMetrics.MAP_STATE_KEYS_INIT_LATENCY); + MapStateMetrics.MAP_STATE_KEYS_INIT_LATENCY); } else { return new IterableWrapper<>(original.keys()); } } @Override public Iterable<UV> values() throws Exception { Review Comment: If calculate all the values returned by this method, I believe the overhead might be quite significant. Therefore, I added sampling logic in the Iterator.next method, ensuring that all data traversal operations using the Iterator will be included in the statistics. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org