I know this is a very simplistic idea but...

In general the issue Eron is describing occurs whenever two (or more)
parallel partitions are assigned to the same Flink sub-task and there is
large time delta between them.  This problem exists though largely because
we are not making any decisions about which of these partitions to read and
when but rather just treating them all the same.  However, this isn't the
only way to approach the problem.

Think instead of each partition as a "roughly time sorted" file and the
function of the connector as roughly a merge sort type process.  In other
words just read the older data first by peeking at each partition and
deciding what to read next.  The output of the connector would be a roughly
time ordered stream that way..

However to really solve the whole problem you'd have to carry this idea
throughout Flink and be more selective about which data you read and when
throughout the whole data flow graph.  Similar problem I think and just
something I've been thinking a bit about lately.




On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright <eronwri...@gmail.com> wrote:

> It is valuable to consider the behavior of a consumer in both a real-time
> processing context, which consists mostly of tail reads, and a historical
> processing context, where there's an abundance of backlogged data.   In the
> historical processing context, system internals (e.g. shard selection
> logic) have an outsized influence on the order of observation and
> potentially the progression of the event time clock.  In a real-time
> context, the order of observation is, by definition, mostly determined by
> the order in which events are produced.
>
> My point is, it would be good to explore the efficacy of these improvements
> in both contexts.
>
>
>
>
> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote:
>
> > I don't think there is a generic solution to the problem you are
> > describing; we don't know how long it will take for resharding to take
> > effect and those changes to become visible to the connector. Depending on
> > how latency sensitive the pipeline is, possibly a configurable watermark
> > hold period could be used to cushion the event time chaos introduced by
> > resharding.
> >
> > This isn't the primary motivation for the connector customization I'm
> > working on though. We face issues with restart from older checkpoints
> where
> > parent and child shards are consumed in parallel.
> >
> >
> > --
> > sent from mobile
> >
> >
> > On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote:
> >
> > I'd like to know how you envision dealing with resharding in relation to
> > the watermark state.   Imagine that a given shard S1 has a watermark of
> T1,
> > and is then split into two shards S2 and S3.   The new shards are
> assigned
> > to subtasks according to a hash function.  The current watermarks of
> those
> > subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> > considered late.
> >
> > The problem of a chaotic event time clock is exacerbated by any source
> that
> > uses dynamic partitioning.  Would a per-shard watermark generator really
> > solve the problem that is motivating you?
> >
> > Thanks,
> > Eron
> >
> > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:
> >
> > > Based on my draft implementation, the changes that are needed in the
> > Flink
> > > connector are as follows:
> > >
> > > I need to be able to override the following to track last record
> > timestamp
> > > and idle time per shard.
> > >
> > >     protected final void emitRecordAndUpdateState(T record, long
> > > recordTimestamp, int shardStateIndex, SequenceNumber
> lastSequenceNumber)
> > {
> > >         synchronized (checkpointLock) {
> > >             sourceContext.collectWithTimestamp(record,
> recordTimestamp);
> > >             updateState(shardStateIndex, lastSequenceNumber);
> > >         }
> > >     }
> > >
> > > Any objection removing final from it?
> > >
> > > Also, why is sourceContext.collectWithTimestamp in the synchronized
> > block?
> > > My custom class will need to emit watermarks - I assume there is no
> need
> > to
> > > acquire checkpointLock for that? Otherwise I would also need to add
> > > emitWatermark() to the base class.
> > >
> > > Let me know if anything else should be considered, I will open a JIRA
> and
> > > PR otherwise.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
> > >
> > > > -->
> > > >
> > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Regarding the two hooks you would like to be available:
> > > >>
> > > >>
> > > >>    - Provide hook to override discovery (not to hit Kinesis from
> every
> > > >>    subtask)
> > > >>
> > > >> Yes, I think we can easily provide a way, for example setting -1 for
> > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> > > >> Though, the user would then have to savepoint and restore in order
> to
> > > >> pick up new shards after a Kinesis stream reshard (which is in
> > practice
> > > the
> > > >> best way to by-pass the Kinesis API rate limitations).
> > > >> +1 to provide that.
> > > >>
> > > >
> > > > I'm considering a customization of KinesisDataFetcher with override
> for
> > > > discoverNewShardsToSubscribe. We still want shards to be discovered,
> > just
> > > > not by hitting Kinesis from every subtask.
> > > >
> > > >
> > > >>
> > > >>
> > > >>    - Provide hook to support custom watermark generation (somewhere
> > > >>    around KinesisDataFetcher.emitRecordAndUpdateState)
> > > >>
> > > >> Per-partition watermark generation on the Kinesis side is slightly
> > more
> > > >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> > > >> I think we need to additionally allow new shards to be consumed only
> > > >> after its parent shard is fully read, otherwise “per-shard time
> > > >> characteristics” can be broken because of this out-of-orderness
> > > consumption
> > > >> across the boundaries of a closed parent shard and its child.
> > > >> There theses JIRAs [1][2] which has a bit more details on the topic.
> > > >> Otherwise, in general I’m also +1 to providing this also in the
> > Kinesis
> > > >> consumer.
> > > >>
> > > >
> > > > Here I'm thinking to customize emitRecordAndUpdateState (method would
> > > need
> > > > to be made non-final). Using getSubscribedShardsState with additional
> > > > transient state to keep track of watermark per shard and emit
> watermark
> > > as
> > > > appropriate.
> > > >
> > > > That's the idea - haven't written any code for it yet.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > >
> >
>

Reply via email to