Based on my draft implementation, the changes that are needed in the Flink connector are as follows:
I need to be able to override the following to track last record timestamp and idle time per shard. protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, recordTimestamp); updateState(shardStateIndex, lastSequenceNumber); } } Any objection removing final from it? Also, why is sourceContext.collectWithTimestamp in the synchronized block? My custom class will need to emit watermarks - I assume there is no need to acquire checkpointLock for that? Otherwise I would also need to add emitWatermark() to the base class. Let me know if anything else should be considered, I will open a JIRA and PR otherwise. Thanks, Thomas On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote: > --> > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Regarding the two hooks you would like to be available: >> >> >> - Provide hook to override discovery (not to hit Kinesis from every >> subtask) >> >> Yes, I think we can easily provide a way, for example setting -1 for >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. >> Though, the user would then have to savepoint and restore in order to >> pick up new shards after a Kinesis stream reshard (which is in practice the >> best way to by-pass the Kinesis API rate limitations). >> +1 to provide that. >> > > I'm considering a customization of KinesisDataFetcher with override for > discoverNewShardsToSubscribe. We still want shards to be discovered, just > not by hitting Kinesis from every subtask. > > >> >> >> - Provide hook to support custom watermark generation (somewhere >> around KinesisDataFetcher.emitRecordAndUpdateState) >> >> Per-partition watermark generation on the Kinesis side is slightly more >> complex than Kafka, due to how Kinesis’s dynamic resharding works. >> I think we need to additionally allow new shards to be consumed only >> after its parent shard is fully read, otherwise “per-shard time >> characteristics” can be broken because of this out-of-orderness consumption >> across the boundaries of a closed parent shard and its child. >> There theses JIRAs [1][2] which has a bit more details on the topic. >> Otherwise, in general I’m also +1 to providing this also in the Kinesis >> consumer. >> > > Here I'm thinking to customize emitRecordAndUpdateState (method would need > to be made non-final). Using getSubscribedShardsState with additional > transient state to keep track of watermark per shard and emit watermark as > appropriate. > > That's the idea - haven't written any code for it yet. > > Thanks, > Thomas > >