Hi Thomas,

I see. If exposing access for these internal classes is a must to enable
further contributions, then I would agree to do so.
I think in the future, we should also keep a closer eye on parts of the
connector code which is highly subject to modifications on a
per-environment basis and keep flexibility in mind as the base assumption
(as you stated very well, there is no "perfect" implementation for a
connector, even with best default implementations).

Some comments on the specific issues that you mentioned:

* use ListShards for discovery (plus also considered to limit the discovery
> to a single subtask and share the results between subtasks, which is almost
> certainly not something I would propose to add to Flink due to additional
> deployment dependencies).


I think this one is most likely a direct improvement to the connector
already (minus the inter-subtask coordination).
The shard discovery method does not use other information from the
`desribeStreams` call, so the alternate API should be a direct replacement.

  * ability to configure the AWS HTTP client when defaults turn out
> unsuitable for the use case. This is a very basic requirement and it is
> rather surprising that the Flink Kinesis consumer wasn't written to provide
> access to the settings that the AWS SDK provides.


I see you have opened a separate JIRA for this (FLINK-9188). And yes, IMO
this is definitely something very desirable in the future.

Finally, I'm also curious how much appetite for contributions in the
> connector areas there is? I see that we have now accumulated 340 open PRs,
> and review bandwidth seems hard to come by.
>

I would personally very like to see these contributions / improvements
happening.
In the past, the community has indeed stalled a bit in keeping up to pace
with all the contributions, but this is something that most of the
committers should have in mind and fix soon.
In the past I looked mostly at connector contributions, and would like to
get up to speed with that again shortly.

Cheers,
Gordon


On Tue, Apr 17, 2018 at 1:58 AM, Thomas Weise <t...@apache.org> wrote:

> Hi Gordon,
>
> This is indeed a discussion necessary to have!
>
> The purpose of previous PRs wasn't to provide solutions to the original
> identified issues, but rather to enable solve those through customization.
> What those customizations would be was also communicated, along with the
> intent to contribute them subsequently as well, if they are deemed broadly
> enough applicable and we find a reasonable contribution path.
>
> So far we have implemented the following in custom code:
>
> * use ListShards for discovery (plus also considered to limit the discovery
> to a single subtask and share the results between subtasks, which is almost
> certainly not something I would propose to add to Flink due to additional
> deployment dependencies).
>
> * override emitRecords in the fetcher to provide source watermarking with
> idle shard handling. Related discussions for the Kafka consumer show that
> it isn't straightforward to arrive at a solution that will satisfy
> everyone. Still open to contribute those changes also, but had not seen a
> response to that. Nevertheless, it is key to allow users to implement what
> they need for their use case.
>
> * retry certain exceptions in getRecords based on our production learnings.
> Whether or not those are applicable to everyone and the Flink
> implementation should be changed to retry by default is actually a future
> discussion I'm intending to start. But in any case, we need to be able to
> make the changes that we need on our end.
>
> * ability to configure the AWS HTTP client when defaults turn out
> unsuitable for the use case. This is a very basic requirement and it is
> rather surprising that the Flink Kinesis consumer wasn't written to provide
> access to the settings that the AWS SDK provides.
>
> I hope above examples make clear that it is necessary to leave room for
> users to augment a base implementation. There is no such thing as a perfect
> connector and there will always be new discoveries by users that require
> improvements or changes. Use case specific considerations may require to
> augment the even best default behavior, what works for one user may not
> work for another.
>
> If I don't have the hooks that referenced PRs enable, then the alternative
> is to fork the code. That will further reduce the likelihood of changes
> making their way back to Flink.
>
> I think we agree in the ultimate goal of improving the default
> implementation of the connector. There are more fundamental issues with the
> Kinesis connector (and other connectors) that I believe require deeper
> design work and rewrite, which go beyond what we discuss here.
>
> Finally, I'm also curious how much appetite for contributions in the
> connector areas there is? I see that we have now accumulated 340 open PRs,
> and review bandwidth seems hard to come by.
>
> Thanks,
> Thomas
>
>
> On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for your PRs!
> >
> > I understand and fully agree with both points that you pointed out.
> >
> > What I'm still a bit torn with is the current proposed solutions for
> these
> > issues (and other similar connector issues).
> >
> > This might actually call for a good opportunity to bring some thoughts up
> > about connector contributions. My arguments would be the following:
> >
> > The solutions actually break some fundamental designs of the connector
> > code.
> > For example, in recent PRs for the Kinesis connector we've been proposing
> > to relax access of the `KinesisProxy` constructor.
> > AFAIK, this fix was triggered by an inefficiency in the
> > `KinesisProxy#getShardsOfStream` method which influences shard discovery
> > performance.
> > First of all, such a change breaks the fact that the class is an internal
> > class (it is marked as @Internal). It was made private as it handles
> > critical paths such as record fetching and shard listing, and is not
> > intended to be modified at all.
> > Second of all, the fix in the end did not fix the inefficiency at all -
> > only for advanced users who perhaps have saw the corresponding JIRA and
> > would bother to do the same and override the inefficient implementations
> by
> > themselves.
> > If there is a fix that would have benefited all users of the connector in
> > general, I would definitely be more in favor of that.
> > This goes the same for https://github.com/apache/flink/pull/5803 - I'm
> not
> > sure that allowing overrides on the retry logic is ideal. For example, we
> > previously introduced in the Elasticsearch connector a RetryHandler
> > user-facing API to allow such customizations.
> >
> > On one hand, I do understand that solving these connector issues properly
> > would perhaps require a more thorough design-wise ground-work and could
> be
> > more time-consuming.
> > On the other hand, I also understand that we need to find a good balance
> to
> > allow production users of these connectors to be able to quickly iterate
> > what issues the current code has and unblock encountered problems.
> >
> > My main concern is that our current approach to fixing these issues, IMO,
> > actually do not encourage good fixes to be contributed back to the
> > connector code, and they would therefore remain problematic as they are.
> >
> > What do you think? I may also be missing thing in a bigger picture here,
> so
> > feedback would be highly appreciated.
> >
> > Cheers,
> > Gordon
> >
> > On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <t...@apache.org> wrote:
> >
> > > PR to provide the hooks:  https://github.com/apache/flink/pull/5803
> > >
> > >
> > > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > I’m working on implementing retry for getRecords in
> > FlinkKinesisConsumer.
> > > > We occasionally get transient socket read timeouts. Instead of
> bubbling
> > > up
> > > > the exception and forcing a topology reset to checkpoint, we want to
> > > retry
> > > > getRecords. We also want to work with a lower socket read timeout
> than
> > > the
> > > > 50s default.
> > > >
> > > > Looking at the current KinesisProxy implementation, I’m aiming to
> > remove
> > > > some baked in assumptions that get into the way of customizing this:
> > > >
> > > > 1) AWSUtil.createKinesisClient - statically wired to use default
> > > > ClientConfiguration. The user should be able to control all settings
> > that
> > > > the SDK exposes instead.
> > > >
> > > > 2) Retry in KinesisProxy.getRecords limited to
> AmazonServiceException.
> > > > Perhaps it is OK as default, but the user should be able to retry on
> > > other
> > > > exceptions if desired.
> > > >
> > > > For 1) a generic option could be to set properties on
> > ClientConfiguration
> > > > using reflection (the class isn’t serializable but follows the Java
> > Bean
> > > > conventions). Something like BeanUtils would make it straightforward
> to
> > > > process user supplied properties with a specific prefix. Is there any
> > > other
> > > > place in the Flink codebase where this style of configuration
> approach
> > is
> > > > used and a preferred alternative to BeanUtils?
> > > >
> > > > Thanks
> > > >
> > > >
> > >
> >
>

Reply via email to