leekeiabstraction commented on code in PR #201:
URL: 
https://github.com/apache/flink-connector-aws/pull/201#discussion_r2139432803


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -244,34 +314,121 @@ private void handleFullyFailedRequest(
 
     @Override
     public void close() {
-        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+        try {
+            kinesisClientProvider.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close the 
kinesisClientProvider", e);

Review Comment:
   Is there a specific or even generic connector exception which extends 
RuntimeException that we can use here?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -244,34 +314,121 @@ private void handleFullyFailedRequest(
 
     @Override
     public void close() {
-        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+        try {
+            kinesisClientProvider.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close the 
kinesisClientProvider", e);
+        }
     }
 
     private void handlePartiallyFailedRequest(
             PutRecordsResponse response,
             List<PutRecordsRequestEntry> requestEntries,
             Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn(
-                "KDS Sink failed to write and will retry {} entries to KDS",
-                response.failedRecordCount());
-        numRecordsOutErrorsCounter.inc(response.failedRecordCount());
+        int failedRecordCount = response.failedRecordCount();
+        LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", 
failedRecordCount);
+        numRecordsOutErrorsCounter.inc(failedRecordCount);
 
         if (failOnError) {
             getFatalExceptionCons()
                     .accept(new 
KinesisStreamsException.KinesisStreamsFailFastException());
             return;
         }
-        List<PutRecordsRequestEntry> failedRequestEntries =
-                new ArrayList<>(response.failedRecordCount());
+
+        List<PutRecordsRequestEntry> failedRequestEntries = new 
ArrayList<>(failedRecordCount);
         List<PutRecordsResultEntry> records = response.records();
 
+        // Collect error information and build the list of failed entries
+        Map<String, ErrorSummary> errorSummaries =
+                collectErrorSummaries(records, requestEntries, 
failedRequestEntries);
+
+        // Log aggregated error information
+        logErrorSummaries(errorSummaries);
+
+        requestResult.accept(failedRequestEntries);
+    }
+
+    /**
+     * Collect error summaries from failed records and build a list of failed 
request entries.
+     *
+     * @param records The result entries from the Kinesis response
+     * @param requestEntries The original request entries
+     * @param failedRequestEntries List to populate with failed entries 
(modified as a side effect)
+     * @return A map of error codes to their summaries
+     */
+    private Map<String, ErrorSummary> collectErrorSummaries(
+            List<PutRecordsResultEntry> records,
+            List<PutRecordsRequestEntry> requestEntries,
+            List<PutRecordsRequestEntry> failedRequestEntries) {
+
+        // We capture error info while minimizing logging overhead in the data 
path,
+        // which is critical for maintaining throughput performance
+        Map<String, ErrorSummary> errorSummaries = new HashMap<>();
+
         for (int i = 0; i < records.size(); i++) {
-            if (records.get(i).errorCode() != null) {
+            PutRecordsResultEntry resultEntry = records.get(i);
+            String errorCode = resultEntry.errorCode();
+
+            if (errorCode != null) {
+                // Track the frequency of each error code to identify patterns
+                ErrorSummary summary =
+                        errorSummaries.computeIfAbsent(
+                                errorCode, code -> new 
ErrorSummary(resultEntry.errorMessage()));
+                summary.incrementCount();
+
                 failedRequestEntries.add(requestEntries.get(i));
             }
         }
 
-        requestResult.accept(failedRequestEntries);
+        return errorSummaries;
+    }
+
+    /**
+     * Log aggregated error information at WARN level.
+     *
+     * @param errorSummaries Map of error codes to their summaries
+     */
+    private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) {
+        // We log aggregated error information at WARN level to ensure 
visibility in production
+        // while avoiding the performance impact of logging each individual 
failure
+        if (!errorSummaries.isEmpty()) {
+            StringBuilder errorSummary = new StringBuilder("Kinesis errors 
summary: ");
+            errorSummaries.forEach(
+                    (code, summary) ->
+                            errorSummary.append(
+                                    String.format(
+                                            "[%s: %d records, example: %s] ",
+                                            code,
+                                            summary.getCount(),
+                                            summary.getExampleMessage())));
+
+            // Using a single WARN log with aggregated information provides 
operational
+            // visibility into errors without flooding logs in high-throughput 
scenarios
+            LOG.warn("KDS Sink failed to write, " + errorSummary.toString());

Review Comment:
   Let's use full class name here for searchability/debug-ability. e.g. 
`KinesisStreamsSinkWriter failed to write records: ...`



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -244,34 +314,121 @@ private void handleFullyFailedRequest(
 
     @Override
     public void close() {
-        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+        try {
+            kinesisClientProvider.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to close the 
kinesisClientProvider", e);
+        }
     }
 
     private void handlePartiallyFailedRequest(
             PutRecordsResponse response,
             List<PutRecordsRequestEntry> requestEntries,
             Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn(
-                "KDS Sink failed to write and will retry {} entries to KDS",
-                response.failedRecordCount());
-        numRecordsOutErrorsCounter.inc(response.failedRecordCount());
+        int failedRecordCount = response.failedRecordCount();
+        LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", 
failedRecordCount);
+        numRecordsOutErrorsCounter.inc(failedRecordCount);
 
         if (failOnError) {
             getFatalExceptionCons()
                     .accept(new 
KinesisStreamsException.KinesisStreamsFailFastException());
             return;
         }
-        List<PutRecordsRequestEntry> failedRequestEntries =
-                new ArrayList<>(response.failedRecordCount());
+
+        List<PutRecordsRequestEntry> failedRequestEntries = new 
ArrayList<>(failedRecordCount);
         List<PutRecordsResultEntry> records = response.records();
 
+        // Collect error information and build the list of failed entries
+        Map<String, ErrorSummary> errorSummaries =
+                collectErrorSummaries(records, requestEntries, 
failedRequestEntries);
+
+        // Log aggregated error information
+        logErrorSummaries(errorSummaries);
+
+        requestResult.accept(failedRequestEntries);
+    }
+
+    /**
+     * Collect error summaries from failed records and build a list of failed 
request entries.
+     *
+     * @param records The result entries from the Kinesis response
+     * @param requestEntries The original request entries
+     * @param failedRequestEntries List to populate with failed entries 
(modified as a side effect)
+     * @return A map of error codes to their summaries
+     */
+    private Map<String, ErrorSummary> collectErrorSummaries(
+            List<PutRecordsResultEntry> records,
+            List<PutRecordsRequestEntry> requestEntries,
+            List<PutRecordsRequestEntry> failedRequestEntries) {
+
+        // We capture error info while minimizing logging overhead in the data 
path,
+        // which is critical for maintaining throughput performance
+        Map<String, ErrorSummary> errorSummaries = new HashMap<>();
+
         for (int i = 0; i < records.size(); i++) {
-            if (records.get(i).errorCode() != null) {
+            PutRecordsResultEntry resultEntry = records.get(i);
+            String errorCode = resultEntry.errorCode();
+
+            if (errorCode != null) {
+                // Track the frequency of each error code to identify patterns
+                ErrorSummary summary =
+                        errorSummaries.computeIfAbsent(
+                                errorCode, code -> new 
ErrorSummary(resultEntry.errorMessage()));
+                summary.incrementCount();
+
                 failedRequestEntries.add(requestEntries.get(i));
             }
         }
 
-        requestResult.accept(failedRequestEntries);
+        return errorSummaries;
+    }
+
+    /**
+     * Log aggregated error information at WARN level.
+     *
+     * @param errorSummaries Map of error codes to their summaries
+     */
+    private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) {
+        // We log aggregated error information at WARN level to ensure 
visibility in production
+        // while avoiding the performance impact of logging each individual 
failure
+        if (!errorSummaries.isEmpty()) {
+            StringBuilder errorSummary = new StringBuilder("Kinesis errors 
summary: ");
+            errorSummaries.forEach(
+                    (code, summary) ->
+                            errorSummary.append(
+                                    String.format(
+                                            "[%s: %d records, example: %s] ",
+                                            code,
+                                            summary.getCount(),
+                                            summary.getExampleMessage())));

Review Comment:
   These can be implemented as ErrorSummary.toString(). Then, instead of using 
StringBuilder to construct string, calling toString() on the hashmap should 
produce a readable string. This way, we'd have less code to maintain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to