PR to provide the hooks: https://github.com/apache/flink/pull/5803
On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote: > Hi, > > I’m working on implementing retry for getRecords in FlinkKinesisConsumer. > We occasionally get transient socket read timeouts. Instead of bubbling up > the exception and forcing a topology reset to checkpoint, we want to retry > getRecords. We also want to work with a lower socket read timeout than the > 50s default. > > Looking at the current KinesisProxy implementation, I’m aiming to remove > some baked in assumptions that get into the way of customizing this: > > 1) AWSUtil.createKinesisClient - statically wired to use default > ClientConfiguration. The user should be able to control all settings that > the SDK exposes instead. > > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException. > Perhaps it is OK as default, but the user should be able to retry on other > exceptions if desired. > > For 1) a generic option could be to set properties on ClientConfiguration > using reflection (the class isn’t serializable but follows the Java Bean > conventions). Something like BeanUtils would make it straightforward to > process user supplied properties with a specific prefix. Is there any other > place in the Flink codebase where this style of configuration approach is > used and a preferred alternative to BeanUtils? > > Thanks > >