Hi,

I’m working on implementing retry for getRecords in FlinkKinesisConsumer.
We occasionally get transient socket read timeouts. Instead of bubbling up
the exception and forcing a topology reset to checkpoint, we want to retry
getRecords. We also want to work with a lower socket read timeout than the
50s default.

Looking at the current KinesisProxy implementation, I’m aiming to remove
some baked in assumptions that get into the way of customizing this:

1) AWSUtil.createKinesisClient - statically wired to use default
ClientConfiguration. The user should be able to control all settings that
the SDK exposes instead.

2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
Perhaps it is OK as default, but the user should be able to retry on other
exceptions if desired.

For 1) a generic option could be to set properties on ClientConfiguration
using reflection (the class isn’t serializable but follows the Java Bean
conventions). Something like BeanUtils would make it straightforward to
process user supplied properties with a specific prefix. Is there any other
place in the Flink codebase where this style of configuration approach is
used and a preferred alternative to BeanUtils?

Thanks

Reply via email to