While we're on this: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
This is a concrete way of separating partition/shard/split discovery from their reading. The nice thing about this is that you can mix-and-match "discovery components" and "reader components". For example, for Kafka we would have a TopicReader and I can envision different discovery implementations: one very simple, no-frills, but rock solid, another one that does automatic discovery of new partitions, regex matching, etc... > On 22. Feb 2018, at 01:49, Jamie Grier <jgr...@lyft.com> wrote: > > I know this is a very simplistic idea but... > > In general the issue Eron is describing occurs whenever two (or more) > parallel partitions are assigned to the same Flink sub-task and there is > large time delta between them. This problem exists though largely because > we are not making any decisions about which of these partitions to read and > when but rather just treating them all the same. However, this isn't the > only way to approach the problem. > > Think instead of each partition as a "roughly time sorted" file and the > function of the connector as roughly a merge sort type process. In other > words just read the older data first by peeking at each partition and > deciding what to read next. The output of the connector would be a roughly > time ordered stream that way.. > > However to really solve the whole problem you'd have to carry this idea > throughout Flink and be more selective about which data you read and when > throughout the whole data flow graph. Similar problem I think and just > something I've been thinking a bit about lately. > > > > > On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright <eronwri...@gmail.com> wrote: > >> It is valuable to consider the behavior of a consumer in both a real-time >> processing context, which consists mostly of tail reads, and a historical >> processing context, where there's an abundance of backlogged data. In the >> historical processing context, system internals (e.g. shard selection >> logic) have an outsized influence on the order of observation and >> potentially the progression of the event time clock. In a real-time >> context, the order of observation is, by definition, mostly determined by >> the order in which events are produced. >> >> My point is, it would be good to explore the efficacy of these improvements >> in both contexts. >> >> >> >> >> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote: >> >>> I don't think there is a generic solution to the problem you are >>> describing; we don't know how long it will take for resharding to take >>> effect and those changes to become visible to the connector. Depending on >>> how latency sensitive the pipeline is, possibly a configurable watermark >>> hold period could be used to cushion the event time chaos introduced by >>> resharding. >>> >>> This isn't the primary motivation for the connector customization I'm >>> working on though. We face issues with restart from older checkpoints >> where >>> parent and child shards are consumed in parallel. >>> >>> >>> -- >>> sent from mobile >>> >>> >>> On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote: >>> >>> I'd like to know how you envision dealing with resharding in relation to >>> the watermark state. Imagine that a given shard S1 has a watermark of >> T1, >>> and is then split into two shards S2 and S3. The new shards are >> assigned >>> to subtasks according to a hash function. The current watermarks of >> those >>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be >>> considered late. >>> >>> The problem of a chaotic event time clock is exacerbated by any source >> that >>> uses dynamic partitioning. Would a per-shard watermark generator really >>> solve the problem that is motivating you? >>> >>> Thanks, >>> Eron >>> >>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote: >>> >>>> Based on my draft implementation, the changes that are needed in the >>> Flink >>>> connector are as follows: >>>> >>>> I need to be able to override the following to track last record >>> timestamp >>>> and idle time per shard. >>>> >>>> protected final void emitRecordAndUpdateState(T record, long >>>> recordTimestamp, int shardStateIndex, SequenceNumber >> lastSequenceNumber) >>> { >>>> synchronized (checkpointLock) { >>>> sourceContext.collectWithTimestamp(record, >> recordTimestamp); >>>> updateState(shardStateIndex, lastSequenceNumber); >>>> } >>>> } >>>> >>>> Any objection removing final from it? >>>> >>>> Also, why is sourceContext.collectWithTimestamp in the synchronized >>> block? >>>> My custom class will need to emit watermarks - I assume there is no >> need >>> to >>>> acquire checkpointLock for that? Otherwise I would also need to add >>>> emitWatermark() to the base class. >>>> >>>> Let me know if anything else should be considered, I will open a JIRA >> and >>>> PR otherwise. >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote: >>>> >>>>> --> >>>>> >>>>> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org >>>>> >>>>> wrote: >>>>> >>>>>> Regarding the two hooks you would like to be available: >>>>>> >>>>>> >>>>>> - Provide hook to override discovery (not to hit Kinesis from >> every >>>>>> subtask) >>>>>> >>>>>> Yes, I think we can easily provide a way, for example setting -1 for >>>>>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. >>>>>> Though, the user would then have to savepoint and restore in order >> to >>>>>> pick up new shards after a Kinesis stream reshard (which is in >>> practice >>>> the >>>>>> best way to by-pass the Kinesis API rate limitations). >>>>>> +1 to provide that. >>>>>> >>>>> >>>>> I'm considering a customization of KinesisDataFetcher with override >> for >>>>> discoverNewShardsToSubscribe. We still want shards to be discovered, >>> just >>>>> not by hitting Kinesis from every subtask. >>>>> >>>>> >>>>>> >>>>>> >>>>>> - Provide hook to support custom watermark generation (somewhere >>>>>> around KinesisDataFetcher.emitRecordAndUpdateState) >>>>>> >>>>>> Per-partition watermark generation on the Kinesis side is slightly >>> more >>>>>> complex than Kafka, due to how Kinesis’s dynamic resharding works. >>>>>> I think we need to additionally allow new shards to be consumed only >>>>>> after its parent shard is fully read, otherwise “per-shard time >>>>>> characteristics” can be broken because of this out-of-orderness >>>> consumption >>>>>> across the boundaries of a closed parent shard and its child. >>>>>> There theses JIRAs [1][2] which has a bit more details on the topic. >>>>>> Otherwise, in general I’m also +1 to providing this also in the >>> Kinesis >>>>>> consumer. >>>>>> >>>>> >>>>> Here I'm thinking to customize emitRecordAndUpdateState (method would >>>> need >>>>> to be made non-final). Using getSubscribedShardsState with additional >>>>> transient state to keep track of watermark per shard and emit >> watermark >>>> as >>>>> appropriate. >>>>> >>>>> That's the idea - haven't written any code for it yet. >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>> >>> >>