Generalizing the pattern would be great. I was also wondering if there aren't other commonalities between sources that would benefit from a shared framework. Kafka and Kinesis don't look all that different from a consumer perspective: replayable source, topic -> stream, partition -> shard, offset -> sequence, dynamic discovery, state saving - shouldn't there be more common code?
Meanwhile, we need to find a way to address shortcomings in the current Kinesis connector to enable the use case. I would prefer to do that without permanently forking the connector code, so here are some more thoughts: - Provide hook to override discovery (not to hit Kinesis from every subtask) - Provide hook to support custom watermark generation (somewhere around KinesisDataFetcher.emitRecordAndUpdateState) If we can accomplish these in short order, it would be great. The current implementation makes it hard/impossible to override certain behaviors (final protected methods and the like). If there is agreement then I would like to address those as a quick PR. Thanks, Thomas On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > That last point is very valid. For a while now I've wanted to generalise > the pattern of our file source to other sources. (This is related to how > Beam sources are being refactored to use Splittable DoFn.) > > I'm very eager for design work to start on this once 1.5 is out the door. > There are some other folks (cc'ed) who have also talked/thought about this > before. > > Best, > Aljoscha > > > On 7. Feb 2018, at 01:44, Thomas Weise <t...@apache.org> wrote: > > > > In addition to lack of watermark support, the Kinesis consumer suffers > from > > a discovery related issue that also needs to be resolved. Shard discovery > > runs periodically in all subtasks. That's not just inefficient but > becomes > > a real problem when there is a large number of subtasks due to rate > > limiting ( > > https://docs.aws.amazon.com/streams/latest/dev/service- > sizes-and-limits.html). > > The discovery interval should be minimized to cap latency (new shards not > > consumed until discovered). > > > > How about moving discovery out of the fetcher into a separate singleton > > source and then broadcast the result to the parallel fetchers, following > > the pattern applied to file input? > > > > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4 > ef726ffe1b/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 > > > > This would also ensure that all subtasks consistently see the same shard > > list. > > > > Thoughts? > > > > Thanks, > > Thomas > > > > > > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise <t...@apache.org> wrote: > > > >> Hi, > >> > >> The Kinesis consumer currently does not emit watermarks, and this can > lead > >> to problems when a single subtask reads from multiple shards and offsets > >> are not closely aligned with respect to the event time. > >> > >> The Kafka consumer has support for periodic and punctuated watermarks, > >> although there is also the unresolved issue https://issues.apache.org/ > >> jira/browse/FLINK-5479 that would equally apply for Kinesis. > >> > >> I propose adding support for timestamp assigner and watermark generator > to > >> the Kinesis consumer. > >> > >> As for handling of idle shards, is there a preference? Perhaps a > >> customization point on the assigner that defers the decision to the user > >> would be appropriate? > >> > >> Thanks, > >> Thomas > >> > >> > >