Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org> wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com>
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>

Reply via email to