dannycranmer commented on code in PR #110:
URL: 
https://github.com/apache/flink-connector-aws/pull/110#discussion_r1372938391


##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java:
##########
@@ -191,6 +191,12 @@ public enum EFORegistrationType {
     public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
             "flink.stream.registerstreamconsumer.backoff.expconst";
 
+    /** The user-provided list of exceptions to recover from. These exceptions 
are retried indefinitely.
+     * Must be a valid JSON array.

Review Comment:
   Passing JSON directly sounds messy, is this done elsewhere in Flink? Can we 
pass a csv or model the object structure in the keys instead:
   ```
   flink.shard.consumer.error.recoverable=typeA,typeB
   ```
   
   Or
   ```
   flink.shard.consumer.error.recoverable[0].exception=typeA
   flink.shard.consumer.error.recoverable[1].exception=typeB
   ```



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java:
##########
@@ -472,4 +483,18 @@ public Optional<String> getConsumerName() {
     public Optional<String> getStreamConsumerArn(String stream) {
         return Optional.ofNullable(streamConsumerArns.get(stream));
     }
+
+    public RecoverableErrorsConfig parseRecoverableErrorConfig(String json) {
+        try {
+            return RecoverableErrorsConfig.fromJson(json);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);

Review Comment:
   Can we add a more friendly message here? Is there a more specific Exception 
we can wrap it in?
   



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java:
##########
@@ -329,6 +336,40 @@ private boolean isInterrupted(final Throwable throwable) {
         return false;
     }
 
+    private boolean isDefinedAsRecoverable(Throwable cause) {
+        // non-customisable list of exceptions that should be recovered 
(retried indefinitely).
+        if (cause instanceof ReadTimeoutException || cause instanceof 
TimeoutException) {
+            // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches take longer to
+            // process than standard read timeout (default 30s). Recoverable 
exceptions are intended to be retried
+            // indefinitely to avoid system degradation under backpressure. 
The EFO connection (subscription) to Kinesis
+            // is closed, and reacquired once the queue of records has been 
processed.
+            return true;
+        }
+        return isConfiguredAsRecoverable(cause);
+    }
+
+    /**
+     * @param cause Throwable on which to base our exception search
+     * @return true if the input Throwable is configured as a Recoverable 
Error by the user
+     */
+    private boolean isConfiguredAsRecoverable(Throwable cause) {
+        if (this.recoverableErrorsConfig == null || 
this.recoverableErrorsConfig.hasNoConfig()) {
+            return false;
+        }
+        for (ExceptionConfig config : 
this.recoverableErrorsConfig.getExceptionConfigs()) {
+            String classPath = config.getExceptionClassPath();
+            try {
+                Class<Throwable> aClass = (Class<Throwable>) 
Class.forName(classPath);
+                Optional<Throwable> throwable = 
ExceptionUtils.findThrowable(cause, aClass);
+                return throwable.isPresent();
+            } catch (ClassNotFoundException e) {
+                // Class not found
+                return false;
+            }

Review Comment:
   Update: Looks like we already are doing this below. Do we really need this? 
If we pass in a `Class<?>` instead of `String` we can remove the inflation



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java:
##########
@@ -0,0 +1,17 @@
+package org.apache.flink.streaming.connectors.kinesis.config;

Review Comment:
   Missing copyright header, please check all files



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java:
##########
@@ -472,4 +483,18 @@ public Optional<String> getConsumerName() {
     public Optional<String> getStreamConsumerArn(String stream) {
         return Optional.ofNullable(streamConsumerArns.get(stream));
     }
+
+    public RecoverableErrorsConfig parseRecoverableErrorConfig(String json) {
+        try {
+            return RecoverableErrorsConfig.fromJson(json);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Return recoverable errors config. */

Review Comment:
   Remove unnecessary comments 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#comments



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java:
##########
@@ -0,0 +1,17 @@
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ * Helper class to hold information/behaviour about Exceptions.
+ * Used for configuring recoverable exceptions.
+ */
+public class ExceptionConfig {
+    private final String exceptionClassPath;

Review Comment:
   Can this be a `Class<?>` instead?



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java:
##########
@@ -329,6 +336,40 @@ private boolean isInterrupted(final Throwable throwable) {
         return false;
     }
 
+    private boolean isDefinedAsRecoverable(Throwable cause) {
+        // non-customisable list of exceptions that should be recovered 
(retried indefinitely).
+        if (cause instanceof ReadTimeoutException || cause instanceof 
TimeoutException) {
+            // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches take longer to
+            // process than standard read timeout (default 30s). Recoverable 
exceptions are intended to be retried
+            // indefinitely to avoid system degradation under backpressure. 
The EFO connection (subscription) to Kinesis
+            // is closed, and reacquired once the queue of records has been 
processed.
+            return true;
+        }
+        return isConfiguredAsRecoverable(cause);
+    }
+
+    /**
+     * @param cause Throwable on which to base our exception search
+     * @return true if the input Throwable is configured as a Recoverable 
Error by the user
+     */
+    private boolean isConfiguredAsRecoverable(Throwable cause) {
+        if (this.recoverableErrorsConfig == null || 
this.recoverableErrorsConfig.hasNoConfig()) {
+            return false;
+        }
+        for (ExceptionConfig config : 
this.recoverableErrorsConfig.getExceptionConfigs()) {
+            String classPath = config.getExceptionClassPath();
+            try {
+                Class<Throwable> aClass = (Class<Throwable>) 
Class.forName(classPath);
+                Optional<Throwable> throwable = 
ExceptionUtils.findThrowable(cause, aClass);
+                return throwable.isPresent();
+            } catch (ClassNotFoundException e) {
+                // Class not found
+                return false;
+            }

Review Comment:
   Not sure we should swallow this, since it implies incorrect configuration. 
Instead of parsing it here recommend we inflate in the connector initialisation 
and perform validation up front.



##########
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java:
##########
@@ -191,6 +191,12 @@ public enum EFORegistrationType {
     public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
             "flink.stream.registerstreamconsumer.backoff.expconst";
 
+    /** The user-provided list of exceptions to recover from. These exceptions 
are retried indefinitely.
+     * Must be a valid JSON array.
+     * Example config: [{"exceptionClass": "java.lang.Exception"}, 
{"exceptionClass": "java.net.UnknownHostException"}]
+     */
+    public static final String RECOVERABLE_EXCEPTIONS = " 
flink.shard.consumer.error.recoverable";

Review Comment:
   Leading whitespace in the key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to