leekeiabstraction commented on code in PR #201: URL: https://github.com/apache/flink-connector-aws/pull/201#discussion_r2139432803
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java: ########## @@ -244,34 +314,121 @@ private void handleFullyFailedRequest( @Override public void close() { - AWSGeneralUtil.closeResources(httpClient, kinesisClient); + try { + kinesisClientProvider.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the kinesisClientProvider", e); Review Comment: Is there a specific or even generic connector exception which extends RuntimeException that we can use here? ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java: ########## @@ -244,34 +314,121 @@ private void handleFullyFailedRequest( @Override public void close() { - AWSGeneralUtil.closeResources(httpClient, kinesisClient); + try { + kinesisClientProvider.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the kinesisClientProvider", e); + } } private void handlePartiallyFailedRequest( PutRecordsResponse response, List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) { - LOG.warn( - "KDS Sink failed to write and will retry {} entries to KDS", - response.failedRecordCount()); - numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + int failedRecordCount = response.failedRecordCount(); + LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount); + numRecordsOutErrorsCounter.inc(failedRecordCount); if (failOnError) { getFatalExceptionCons() .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); return; } - List<PutRecordsRequestEntry> failedRequestEntries = - new ArrayList<>(response.failedRecordCount()); + + List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(failedRecordCount); List<PutRecordsResultEntry> records = response.records(); + // Collect error information and build the list of failed entries + Map<String, ErrorSummary> errorSummaries = + collectErrorSummaries(records, requestEntries, failedRequestEntries); + + // Log aggregated error information + logErrorSummaries(errorSummaries); + + requestResult.accept(failedRequestEntries); + } + + /** + * Collect error summaries from failed records and build a list of failed request entries. + * + * @param records The result entries from the Kinesis response + * @param requestEntries The original request entries + * @param failedRequestEntries List to populate with failed entries (modified as a side effect) + * @return A map of error codes to their summaries + */ + private Map<String, ErrorSummary> collectErrorSummaries( + List<PutRecordsResultEntry> records, + List<PutRecordsRequestEntry> requestEntries, + List<PutRecordsRequestEntry> failedRequestEntries) { + + // We capture error info while minimizing logging overhead in the data path, + // which is critical for maintaining throughput performance + Map<String, ErrorSummary> errorSummaries = new HashMap<>(); + for (int i = 0; i < records.size(); i++) { - if (records.get(i).errorCode() != null) { + PutRecordsResultEntry resultEntry = records.get(i); + String errorCode = resultEntry.errorCode(); + + if (errorCode != null) { + // Track the frequency of each error code to identify patterns + ErrorSummary summary = + errorSummaries.computeIfAbsent( + errorCode, code -> new ErrorSummary(resultEntry.errorMessage())); + summary.incrementCount(); + failedRequestEntries.add(requestEntries.get(i)); } } - requestResult.accept(failedRequestEntries); + return errorSummaries; + } + + /** + * Log aggregated error information at WARN level. + * + * @param errorSummaries Map of error codes to their summaries + */ + private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) { + // We log aggregated error information at WARN level to ensure visibility in production + // while avoiding the performance impact of logging each individual failure + if (!errorSummaries.isEmpty()) { + StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: "); + errorSummaries.forEach( + (code, summary) -> + errorSummary.append( + String.format( + "[%s: %d records, example: %s] ", + code, + summary.getCount(), + summary.getExampleMessage()))); + + // Using a single WARN log with aggregated information provides operational + // visibility into errors without flooding logs in high-throughput scenarios + LOG.warn("KDS Sink failed to write, " + errorSummary.toString()); Review Comment: Let's use full class name here for searchability/debug-ability. e.g. `KinesisStreamsSinkWriter failed to write records: ...` ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java: ########## @@ -244,34 +314,121 @@ private void handleFullyFailedRequest( @Override public void close() { - AWSGeneralUtil.closeResources(httpClient, kinesisClient); + try { + kinesisClientProvider.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the kinesisClientProvider", e); + } } private void handlePartiallyFailedRequest( PutRecordsResponse response, List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) { - LOG.warn( - "KDS Sink failed to write and will retry {} entries to KDS", - response.failedRecordCount()); - numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + int failedRecordCount = response.failedRecordCount(); + LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount); + numRecordsOutErrorsCounter.inc(failedRecordCount); if (failOnError) { getFatalExceptionCons() .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); return; } - List<PutRecordsRequestEntry> failedRequestEntries = - new ArrayList<>(response.failedRecordCount()); + + List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(failedRecordCount); List<PutRecordsResultEntry> records = response.records(); + // Collect error information and build the list of failed entries + Map<String, ErrorSummary> errorSummaries = + collectErrorSummaries(records, requestEntries, failedRequestEntries); + + // Log aggregated error information + logErrorSummaries(errorSummaries); + + requestResult.accept(failedRequestEntries); + } + + /** + * Collect error summaries from failed records and build a list of failed request entries. + * + * @param records The result entries from the Kinesis response + * @param requestEntries The original request entries + * @param failedRequestEntries List to populate with failed entries (modified as a side effect) + * @return A map of error codes to their summaries + */ + private Map<String, ErrorSummary> collectErrorSummaries( + List<PutRecordsResultEntry> records, + List<PutRecordsRequestEntry> requestEntries, + List<PutRecordsRequestEntry> failedRequestEntries) { + + // We capture error info while minimizing logging overhead in the data path, + // which is critical for maintaining throughput performance + Map<String, ErrorSummary> errorSummaries = new HashMap<>(); + for (int i = 0; i < records.size(); i++) { - if (records.get(i).errorCode() != null) { + PutRecordsResultEntry resultEntry = records.get(i); + String errorCode = resultEntry.errorCode(); + + if (errorCode != null) { + // Track the frequency of each error code to identify patterns + ErrorSummary summary = + errorSummaries.computeIfAbsent( + errorCode, code -> new ErrorSummary(resultEntry.errorMessage())); + summary.incrementCount(); + failedRequestEntries.add(requestEntries.get(i)); } } - requestResult.accept(failedRequestEntries); + return errorSummaries; + } + + /** + * Log aggregated error information at WARN level. + * + * @param errorSummaries Map of error codes to their summaries + */ + private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) { + // We log aggregated error information at WARN level to ensure visibility in production + // while avoiding the performance impact of logging each individual failure + if (!errorSummaries.isEmpty()) { + StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: "); + errorSummaries.forEach( + (code, summary) -> + errorSummary.append( + String.format( + "[%s: %d records, example: %s] ", + code, + summary.getCount(), + summary.getExampleMessage()))); Review Comment: These can be implemented as ErrorSummary.toString(). Then, instead of using StringBuilder to construct string, calling toString() on the hashmap should produce a readable string. This way, we'd have less code to maintain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org