Big +1 on trying to come up with a common framework for partition-based, replayable sources. There is so much common code to be written that makes it possible to write correct connectors and Gordon's bullet points are exactly those -- and it's not just Kinesis and Kafka. It's also true for reading data out of something like S3. If your data is organized as a bunch of parallel, roughly time ordered files, there really isn't much difference in the kind of code you have to write for this for all the hard bits mentioned above.
The good news is that the potential outcome of this sort of effort could be that production quality, correct, parallel connectors are much easier to implement. Ideally everything other than the code you write to discover partitions and the code to consume data from a single simple partition could be mostly common. On Thu, Feb 8, 2018 at 2:01 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Thomas, > > It’s great that you’ve brought out these issues, which IMO are all very > valid. They have also been in my head for a while. > > Here’s a list of things, out of the top of my head, that I would really > like to improve as part of a major Kafka / Kinesis connector rework. > Some have JIRAs for them already, or were discussed in some other > indirectly related JIRA. It might make sense to open an umbrella ticket and > consolidate all of them there. > > - Common abstraction for partition-based, replayable sources, which > handles 1) position checkpointing, 2) partition discovery / topic > subscription (using the file source pattern), 3) custom startup positions, > 4) per-partition watermarks, and 5) partition idleness. > - Configuration for the connectors are not well-defined. Some go through > provided properties, some requires using setter methods, etc. Moreover, it > is actually very confusing for some users that we share the Properties to > carry Flink connector-specific configurations, as well as the internally > used client configuration [1]. I think in this aspect, Beam’s KafkaIO has a > nice API [2] when it comes to this. > - Some default behaviors of the current connectors, such as partitioning > and flushing on the producer sides, and offset-committing for the Kafka > consumer, do not make sense [3] [4]. > - The deserialization / serialization schema together with the partitioner > interfaces don’t really place well together. For example, the > `getTargetTopic` method should really be part of the partitioner [5]. > > I think we are now in a good position to try making this happen for 1.6. > Once 1.5 is out of the way, I can try opening an umbrella JIRA and collect > everything there so we can discuss more there. > > Cheers, > Gordon > > [1] https://issues.apache.org/jira/browse/FLINK-4280? > focusedCommentId=15399648&page=com.atlassian.jira. > plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648 > [2] https://github.com/apache/beam/blob/master/sdks/java/io/ > kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171 > [3] https://issues.apache.org/jira/browse/FLINK-5728 > [4] https://issues.apache.org/jira/browse/FLINK-5704 > [5] https://issues.apache.org/jira/browse/FLINK-6288 > > On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote: > > Generalizing the pattern would be great. I was also wondering if there > aren't other commonalities between sources that would benefit from a shared > framework. Kafka and Kinesis don't look all that different from a consumer > perspective: replayable source, topic -> stream, partition -> shard, offset > -> sequence, dynamic discovery, state saving - shouldn't there be more > common code? > > Meanwhile, we need to find a way to address shortcomings in the current > Kinesis connector to enable the use case. I would prefer to do that without > permanently forking the connector code, so here are some more thoughts: > Provide hook to override discovery (not to hit Kinesis from every subtask) > Provide hook to support custom watermark generation (somewhere around > KinesisDataFetcher.emitRecordAndUpdateState) > If we can accomplish these in short order, it would be great. The current > implementation makes it hard/impossible to override certain behaviors > (final protected methods and the like). If there is agreement then I would > like to address those as a quick PR. > > Thanks, > Thomas > > > On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > Hi, > > That last point is very valid. For a while now I've wanted to generalise > the pattern of our file source to other sources. (This is related to how > Beam sources are being refactored to use Splittable DoFn.) > > I'm very eager for design work to start on this once 1.5 is out the door. > There are some other folks (cc'ed) who have also talked/thought about this > before. > > Best, > Aljoscha > > > On 7. Feb 2018, at 01:44, Thomas Weise <t...@apache.org> wrote: > > > > In addition to lack of watermark support, the Kinesis consumer suffers > from > > a discovery related issue that also needs to be resolved. Shard discovery > > runs periodically in all subtasks. That's not just inefficient but > becomes > > a real problem when there is a large number of subtasks due to rate > > limiting ( > > https://docs.aws.amazon.com/streams/latest/dev/service- > sizes-and-limits.html). > > The discovery interval should be minimized to cap latency (new shards not > > consumed until discovered). > > > > How about moving discovery out of the fetcher into a separate singleton > > source and then broadcast the result to the parallel fetchers, following > > the pattern applied to file input? > > > > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4 > ef726ffe1b/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336 > > > > This would also ensure that all subtasks consistently see the same shard > > list. > > > > Thoughts? > > > > Thanks, > > Thomas > > > > > > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise <t...@apache.org> wrote: > > > >> Hi, > >> > >> The Kinesis consumer currently does not emit watermarks, and this can > lead > >> to problems when a single subtask reads from multiple shards and offsets > >> are not closely aligned with respect to the event time. > >> > >> The Kafka consumer has support for periodic and punctuated watermarks, > >> although there is also the unresolved issue https://issues.apache.org/ > >> jira/browse/FLINK-5479 that would equally apply for Kinesis. > >> > >> I propose adding support for timestamp assigner and watermark generator > to > >> the Kinesis consumer. > >> > >> As for handling of idle shards, is there a preference? Perhaps a > >> customization point on the assigner that defers the decision to the user > >> would be appropriate? > >> > >> Thanks, > >> Thomas > >> > >> > > >