dannycranmer commented on code in PR #110: URL: https://github.com/apache/flink-connector-aws/pull/110#discussion_r1372938391
########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java: ########## @@ -191,6 +191,12 @@ public enum EFORegistrationType { public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.registerstreamconsumer.backoff.expconst"; + /** The user-provided list of exceptions to recover from. These exceptions are retried indefinitely. + * Must be a valid JSON array. Review Comment: Passing JSON directly sounds messy, is this done elsewhere in Flink? Can we pass a csv or model the object structure in the keys instead: ``` flink.shard.consumer.error.recoverable=typeA,typeB ``` Or ``` flink.shard.consumer.error.recoverable[0].exception=typeA flink.shard.consumer.error.recoverable[1].exception=typeB ``` ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java: ########## @@ -472,4 +483,18 @@ public Optional<String> getConsumerName() { public Optional<String> getStreamConsumerArn(String stream) { return Optional.ofNullable(streamConsumerArns.get(stream)); } + + public RecoverableErrorsConfig parseRecoverableErrorConfig(String json) { + try { + return RecoverableErrorsConfig.fromJson(json); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); Review Comment: Can we add a more friendly message here? Is there a more specific Exception we can wrap it in? ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java: ########## @@ -329,6 +336,40 @@ private boolean isInterrupted(final Throwable throwable) { return false; } + private boolean isDefinedAsRecoverable(Throwable cause) { + // non-customisable list of exceptions that should be recovered (retried indefinitely). + if (cause instanceof ReadTimeoutException || cause instanceof TimeoutException) { + // ReadTimeoutException occurs naturally under backpressure scenarios when full batches take longer to + // process than standard read timeout (default 30s). Recoverable exceptions are intended to be retried + // indefinitely to avoid system degradation under backpressure. The EFO connection (subscription) to Kinesis + // is closed, and reacquired once the queue of records has been processed. + return true; + } + return isConfiguredAsRecoverable(cause); + } + + /** + * @param cause Throwable on which to base our exception search + * @return true if the input Throwable is configured as a Recoverable Error by the user + */ + private boolean isConfiguredAsRecoverable(Throwable cause) { + if (this.recoverableErrorsConfig == null || this.recoverableErrorsConfig.hasNoConfig()) { + return false; + } + for (ExceptionConfig config : this.recoverableErrorsConfig.getExceptionConfigs()) { + String classPath = config.getExceptionClassPath(); + try { + Class<Throwable> aClass = (Class<Throwable>) Class.forName(classPath); + Optional<Throwable> throwable = ExceptionUtils.findThrowable(cause, aClass); + return throwable.isPresent(); + } catch (ClassNotFoundException e) { + // Class not found + return false; + } Review Comment: Update: Looks like we already are doing this below. Do we really need this? If we pass in a `Class<?>` instead of `String` we can remove the inflation ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java: ########## @@ -0,0 +1,17 @@ +package org.apache.flink.streaming.connectors.kinesis.config; Review Comment: Missing copyright header, please check all files ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java: ########## @@ -472,4 +483,18 @@ public Optional<String> getConsumerName() { public Optional<String> getStreamConsumerArn(String stream) { return Optional.ofNullable(streamConsumerArns.get(stream)); } + + public RecoverableErrorsConfig parseRecoverableErrorConfig(String json) { + try { + return RecoverableErrorsConfig.fromJson(json); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** Return recoverable errors config. */ Review Comment: Remove unnecessary comments https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#comments ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java: ########## @@ -0,0 +1,17 @@ +package org.apache.flink.streaming.connectors.kinesis.config; + +/** + * Helper class to hold information/behaviour about Exceptions. + * Used for configuring recoverable exceptions. + */ +public class ExceptionConfig { + private final String exceptionClassPath; Review Comment: Can this be a `Class<?>` instead? ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java: ########## @@ -329,6 +336,40 @@ private boolean isInterrupted(final Throwable throwable) { return false; } + private boolean isDefinedAsRecoverable(Throwable cause) { + // non-customisable list of exceptions that should be recovered (retried indefinitely). + if (cause instanceof ReadTimeoutException || cause instanceof TimeoutException) { + // ReadTimeoutException occurs naturally under backpressure scenarios when full batches take longer to + // process than standard read timeout (default 30s). Recoverable exceptions are intended to be retried + // indefinitely to avoid system degradation under backpressure. The EFO connection (subscription) to Kinesis + // is closed, and reacquired once the queue of records has been processed. + return true; + } + return isConfiguredAsRecoverable(cause); + } + + /** + * @param cause Throwable on which to base our exception search + * @return true if the input Throwable is configured as a Recoverable Error by the user + */ + private boolean isConfiguredAsRecoverable(Throwable cause) { + if (this.recoverableErrorsConfig == null || this.recoverableErrorsConfig.hasNoConfig()) { + return false; + } + for (ExceptionConfig config : this.recoverableErrorsConfig.getExceptionConfigs()) { + String classPath = config.getExceptionClassPath(); + try { + Class<Throwable> aClass = (Class<Throwable>) Class.forName(classPath); + Optional<Throwable> throwable = ExceptionUtils.findThrowable(cause, aClass); + return throwable.isPresent(); + } catch (ClassNotFoundException e) { + // Class not found + return false; + } Review Comment: Not sure we should swallow this, since it implies incorrect configuration. Instead of parsing it here recommend we inflate in the connector initialisation and perform validation up front. ########## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java: ########## @@ -191,6 +191,12 @@ public enum EFORegistrationType { public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.registerstreamconsumer.backoff.expconst"; + /** The user-provided list of exceptions to recover from. These exceptions are retried indefinitely. + * Must be a valid JSON array. + * Example config: [{"exceptionClass": "java.lang.Exception"}, {"exceptionClass": "java.net.UnknownHostException"}] + */ + public static final String RECOVERABLE_EXCEPTIONS = " flink.shard.consumer.error.recoverable"; Review Comment: Leading whitespace in the key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org