Hi Gordon, This is indeed a discussion necessary to have!
The purpose of previous PRs wasn't to provide solutions to the original identified issues, but rather to enable solve those through customization. What those customizations would be was also communicated, along with the intent to contribute them subsequently as well, if they are deemed broadly enough applicable and we find a reasonable contribution path. So far we have implemented the following in custom code: * use ListShards for discovery (plus also considered to limit the discovery to a single subtask and share the results between subtasks, which is almost certainly not something I would propose to add to Flink due to additional deployment dependencies). * override emitRecords in the fetcher to provide source watermarking with idle shard handling. Related discussions for the Kafka consumer show that it isn't straightforward to arrive at a solution that will satisfy everyone. Still open to contribute those changes also, but had not seen a response to that. Nevertheless, it is key to allow users to implement what they need for their use case. * retry certain exceptions in getRecords based on our production learnings. Whether or not those are applicable to everyone and the Flink implementation should be changed to retry by default is actually a future discussion I'm intending to start. But in any case, we need to be able to make the changes that we need on our end. * ability to configure the AWS HTTP client when defaults turn out unsuitable for the use case. This is a very basic requirement and it is rather surprising that the Flink Kinesis consumer wasn't written to provide access to the settings that the AWS SDK provides. I hope above examples make clear that it is necessary to leave room for users to augment a base implementation. There is no such thing as a perfect connector and there will always be new discoveries by users that require improvements or changes. Use case specific considerations may require to augment the even best default behavior, what works for one user may not work for another. If I don't have the hooks that referenced PRs enable, then the alternative is to fork the code. That will further reduce the likelihood of changes making their way back to Flink. I think we agree in the ultimate goal of improving the default implementation of the connector. There are more fundamental issues with the Kinesis connector (and other connectors) that I believe require deeper design work and rewrite, which go beyond what we discuss here. Finally, I'm also curious how much appetite for contributions in the connector areas there is? I see that we have now accumulated 340 open PRs, and review bandwidth seems hard to come by. Thanks, Thomas On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Thomas, > > Thanks for your PRs! > > I understand and fully agree with both points that you pointed out. > > What I'm still a bit torn with is the current proposed solutions for these > issues (and other similar connector issues). > > This might actually call for a good opportunity to bring some thoughts up > about connector contributions. My arguments would be the following: > > The solutions actually break some fundamental designs of the connector > code. > For example, in recent PRs for the Kinesis connector we've been proposing > to relax access of the `KinesisProxy` constructor. > AFAIK, this fix was triggered by an inefficiency in the > `KinesisProxy#getShardsOfStream` method which influences shard discovery > performance. > First of all, such a change breaks the fact that the class is an internal > class (it is marked as @Internal). It was made private as it handles > critical paths such as record fetching and shard listing, and is not > intended to be modified at all. > Second of all, the fix in the end did not fix the inefficiency at all - > only for advanced users who perhaps have saw the corresponding JIRA and > would bother to do the same and override the inefficient implementations by > themselves. > If there is a fix that would have benefited all users of the connector in > general, I would definitely be more in favor of that. > This goes the same for https://github.com/apache/flink/pull/5803 - I'm not > sure that allowing overrides on the retry logic is ideal. For example, we > previously introduced in the Elasticsearch connector a RetryHandler > user-facing API to allow such customizations. > > On one hand, I do understand that solving these connector issues properly > would perhaps require a more thorough design-wise ground-work and could be > more time-consuming. > On the other hand, I also understand that we need to find a good balance to > allow production users of these connectors to be able to quickly iterate > what issues the current code has and unblock encountered problems. > > My main concern is that our current approach to fixing these issues, IMO, > actually do not encourage good fixes to be contributed back to the > connector code, and they would therefore remain problematic as they are. > > What do you think? I may also be missing thing in a bigger picture here, so > feedback would be highly appreciated. > > Cheers, > Gordon > > On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <t...@apache.org> wrote: > > > PR to provide the hooks: https://github.com/apache/flink/pull/5803 > > > > > > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote: > > > > > Hi, > > > > > > I’m working on implementing retry for getRecords in > FlinkKinesisConsumer. > > > We occasionally get transient socket read timeouts. Instead of bubbling > > up > > > the exception and forcing a topology reset to checkpoint, we want to > > retry > > > getRecords. We also want to work with a lower socket read timeout than > > the > > > 50s default. > > > > > > Looking at the current KinesisProxy implementation, I’m aiming to > remove > > > some baked in assumptions that get into the way of customizing this: > > > > > > 1) AWSUtil.createKinesisClient - statically wired to use default > > > ClientConfiguration. The user should be able to control all settings > that > > > the SDK exposes instead. > > > > > > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException. > > > Perhaps it is OK as default, but the user should be able to retry on > > other > > > exceptions if desired. > > > > > > For 1) a generic option could be to set properties on > ClientConfiguration > > > using reflection (the class isn’t serializable but follows the Java > Bean > > > conventions). Something like BeanUtils would make it straightforward to > > > process user supplied properties with a specific prefix. Is there any > > other > > > place in the Flink codebase where this style of configuration approach > is > > > used and a preferred alternative to BeanUtils? > > > > > > Thanks > > > > > > > > >