rkhachatryan commented on code in PR #23391: URL: https://github.com/apache/flink/pull/23391#discussion_r1321980461
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ########## @@ -227,11 +257,23 @@ void reportFailedCheckpoint(FailedCheckpointStats failed) { history.replacePendingCheckpointById(failed); dirty = true; + logCheckpointStatistics(failed); } finally { statsReadWriteLock.unlock(); } } + private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { + try { + StringWriter sw = new StringWriter(); + MAPPER.writeValue(sw, CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true)); + String jsonDump = sw.toString(); + LOG.debug("CheckpointStatistics (for jobID={}, jobName={}) dump = {} ", jobID, jobName, jsonDump); Review Comment: How about explicitly logging checkpoint ID along with job ID? So that we don't need to parse/know json to get stats for a specific checkpoint. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ########## @@ -227,11 +257,23 @@ void reportFailedCheckpoint(FailedCheckpointStats failed) { history.replacePendingCheckpointById(failed); dirty = true; + logCheckpointStatistics(failed); } finally { statsReadWriteLock.unlock(); } } + private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { + try { + StringWriter sw = new StringWriter(); + MAPPER.writeValue(sw, CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true)); + String jsonDump = sw.toString(); Review Comment: Should we save some resources by guarding this with isDebugEnabled? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ########## @@ -227,11 +257,23 @@ void reportFailedCheckpoint(FailedCheckpointStats failed) { history.replacePendingCheckpointById(failed); dirty = true; + logCheckpointStatistics(failed); } finally { statsReadWriteLock.unlock(); } } + private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { + try { + StringWriter sw = new StringWriter(); + MAPPER.writeValue(sw, CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true)); + String jsonDump = sw.toString(); + LOG.debug("CheckpointStatistics (for jobID={}, jobName={}) dump = {} ", jobID, jobName, jsonDump); + } catch (Exception ex) { + LOG.error("Fail to log CheckpointStatistics", ex); Review Comment: nit: I'd log it as a warning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org