Hi, I’m working on implementing retry for getRecords in FlinkKinesisConsumer. We occasionally get transient socket read timeouts. Instead of bubbling up the exception and forcing a topology reset to checkpoint, we want to retry getRecords. We also want to work with a lower socket read timeout than the 50s default.
Looking at the current KinesisProxy implementation, I’m aiming to remove some baked in assumptions that get into the way of customizing this: 1) AWSUtil.createKinesisClient - statically wired to use default ClientConfiguration. The user should be able to control all settings that the SDK exposes instead. 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException. Perhaps it is OK as default, but the user should be able to retry on other exceptions if desired. For 1) a generic option could be to set properties on ClientConfiguration using reflection (the class isn’t serializable but follows the Java Bean conventions). Something like BeanUtils would make it straightforward to process user supplied properties with a specific prefix. Is there any other place in the Flink codebase where this style of configuration approach is used and a preferred alternative to BeanUtils? Thanks