Hi Thomas, I see. If exposing access for these internal classes is a must to enable further contributions, then I would agree to do so. I think in the future, we should also keep a closer eye on parts of the connector code which is highly subject to modifications on a per-environment basis and keep flexibility in mind as the base assumption (as you stated very well, there is no "perfect" implementation for a connector, even with best default implementations).
Some comments on the specific issues that you mentioned: * use ListShards for discovery (plus also considered to limit the discovery > to a single subtask and share the results between subtasks, which is almost > certainly not something I would propose to add to Flink due to additional > deployment dependencies). I think this one is most likely a direct improvement to the connector already (minus the inter-subtask coordination). The shard discovery method does not use other information from the `desribeStreams` call, so the alternate API should be a direct replacement. * ability to configure the AWS HTTP client when defaults turn out > unsuitable for the use case. This is a very basic requirement and it is > rather surprising that the Flink Kinesis consumer wasn't written to provide > access to the settings that the AWS SDK provides. I see you have opened a separate JIRA for this (FLINK-9188). And yes, IMO this is definitely something very desirable in the future. Finally, I'm also curious how much appetite for contributions in the > connector areas there is? I see that we have now accumulated 340 open PRs, > and review bandwidth seems hard to come by. > I would personally very like to see these contributions / improvements happening. In the past, the community has indeed stalled a bit in keeping up to pace with all the contributions, but this is something that most of the committers should have in mind and fix soon. In the past I looked mostly at connector contributions, and would like to get up to speed with that again shortly. Cheers, Gordon On Tue, Apr 17, 2018 at 1:58 AM, Thomas Weise <t...@apache.org> wrote: > Hi Gordon, > > This is indeed a discussion necessary to have! > > The purpose of previous PRs wasn't to provide solutions to the original > identified issues, but rather to enable solve those through customization. > What those customizations would be was also communicated, along with the > intent to contribute them subsequently as well, if they are deemed broadly > enough applicable and we find a reasonable contribution path. > > So far we have implemented the following in custom code: > > * use ListShards for discovery (plus also considered to limit the discovery > to a single subtask and share the results between subtasks, which is almost > certainly not something I would propose to add to Flink due to additional > deployment dependencies). > > * override emitRecords in the fetcher to provide source watermarking with > idle shard handling. Related discussions for the Kafka consumer show that > it isn't straightforward to arrive at a solution that will satisfy > everyone. Still open to contribute those changes also, but had not seen a > response to that. Nevertheless, it is key to allow users to implement what > they need for their use case. > > * retry certain exceptions in getRecords based on our production learnings. > Whether or not those are applicable to everyone and the Flink > implementation should be changed to retry by default is actually a future > discussion I'm intending to start. But in any case, we need to be able to > make the changes that we need on our end. > > * ability to configure the AWS HTTP client when defaults turn out > unsuitable for the use case. This is a very basic requirement and it is > rather surprising that the Flink Kinesis consumer wasn't written to provide > access to the settings that the AWS SDK provides. > > I hope above examples make clear that it is necessary to leave room for > users to augment a base implementation. There is no such thing as a perfect > connector and there will always be new discoveries by users that require > improvements or changes. Use case specific considerations may require to > augment the even best default behavior, what works for one user may not > work for another. > > If I don't have the hooks that referenced PRs enable, then the alternative > is to fork the code. That will further reduce the likelihood of changes > making their way back to Flink. > > I think we agree in the ultimate goal of improving the default > implementation of the connector. There are more fundamental issues with the > Kinesis connector (and other connectors) that I believe require deeper > design work and rewrite, which go beyond what we discuss here. > > Finally, I'm also curious how much appetite for contributions in the > connector areas there is? I see that we have now accumulated 340 open PRs, > and review bandwidth seems hard to come by. > > Thanks, > Thomas > > > On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > > Hi Thomas, > > > > Thanks for your PRs! > > > > I understand and fully agree with both points that you pointed out. > > > > What I'm still a bit torn with is the current proposed solutions for > these > > issues (and other similar connector issues). > > > > This might actually call for a good opportunity to bring some thoughts up > > about connector contributions. My arguments would be the following: > > > > The solutions actually break some fundamental designs of the connector > > code. > > For example, in recent PRs for the Kinesis connector we've been proposing > > to relax access of the `KinesisProxy` constructor. > > AFAIK, this fix was triggered by an inefficiency in the > > `KinesisProxy#getShardsOfStream` method which influences shard discovery > > performance. > > First of all, such a change breaks the fact that the class is an internal > > class (it is marked as @Internal). It was made private as it handles > > critical paths such as record fetching and shard listing, and is not > > intended to be modified at all. > > Second of all, the fix in the end did not fix the inefficiency at all - > > only for advanced users who perhaps have saw the corresponding JIRA and > > would bother to do the same and override the inefficient implementations > by > > themselves. > > If there is a fix that would have benefited all users of the connector in > > general, I would definitely be more in favor of that. > > This goes the same for https://github.com/apache/flink/pull/5803 - I'm > not > > sure that allowing overrides on the retry logic is ideal. For example, we > > previously introduced in the Elasticsearch connector a RetryHandler > > user-facing API to allow such customizations. > > > > On one hand, I do understand that solving these connector issues properly > > would perhaps require a more thorough design-wise ground-work and could > be > > more time-consuming. > > On the other hand, I also understand that we need to find a good balance > to > > allow production users of these connectors to be able to quickly iterate > > what issues the current code has and unblock encountered problems. > > > > My main concern is that our current approach to fixing these issues, IMO, > > actually do not encourage good fixes to be contributed back to the > > connector code, and they would therefore remain problematic as they are. > > > > What do you think? I may also be missing thing in a bigger picture here, > so > > feedback would be highly appreciated. > > > > Cheers, > > Gordon > > > > On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <t...@apache.org> wrote: > > > > > PR to provide the hooks: https://github.com/apache/flink/pull/5803 > > > > > > > > > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote: > > > > > > > Hi, > > > > > > > > I’m working on implementing retry for getRecords in > > FlinkKinesisConsumer. > > > > We occasionally get transient socket read timeouts. Instead of > bubbling > > > up > > > > the exception and forcing a topology reset to checkpoint, we want to > > > retry > > > > getRecords. We also want to work with a lower socket read timeout > than > > > the > > > > 50s default. > > > > > > > > Looking at the current KinesisProxy implementation, I’m aiming to > > remove > > > > some baked in assumptions that get into the way of customizing this: > > > > > > > > 1) AWSUtil.createKinesisClient - statically wired to use default > > > > ClientConfiguration. The user should be able to control all settings > > that > > > > the SDK exposes instead. > > > > > > > > 2) Retry in KinesisProxy.getRecords limited to > AmazonServiceException. > > > > Perhaps it is OK as default, but the user should be able to retry on > > > other > > > > exceptions if desired. > > > > > > > > For 1) a generic option could be to set properties on > > ClientConfiguration > > > > using reflection (the class isn’t serializable but follows the Java > > Bean > > > > conventions). Something like BeanUtils would make it straightforward > to > > > > process user supplied properties with a specific prefix. Is there any > > > other > > > > place in the Flink codebase where this style of configuration > approach > > is > > > > used and a preferred alternative to BeanUtils? > > > > > > > > Thanks > > > > > > > > > > > > > >