Here's a doc I started describing some changes we would like to make starting with the Kinesis Source.. It describes a refactoring of that code specifically and also hopefully a pattern and some reusable code we can use in the other sources as well. The end goal would be best-effort event-time synchronization across all Flink sources but we are going to start with the Kinesis Source first.
Please take a look and please provide thoughts and opinions about the best state sharing mechanism to use -- that section is left blank and we're especially looking for input there. https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing -Jamie On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org> wrote: > But on the Kafka source level it should be perfectly fine to do what Elias > proposed. This is of course is not the perfect solution but could bring us > forward quite a bit. The changes required for this should also be minimal. > This would become obsolete once we have something like shared state. But > until then, I think it would worth a try. > > Cheers, > Till > > On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > The reason this selective reading doesn't work well in Flink in the > moment > > is because of checkpointing. For checkpointing, checkpoint barriers > travel > > within the streams. If we selectively read from inputs based on > timestamps > > this is akin to blocking an input if that input is very far ahead in > event > > time, which can happen when you have a very fast source and a slow source > > (in event time), maybe because you're in a catchup phase. In those cases > > it's better to simply not read the data at the sources, as Thomas said. > > This is also because with Kafka Streams, each operator is basically its > own > > job: it's reading from Kafka and writing to Kafka and there is not a > > complex graph of different operations with network shuffles in between, > as > > you have with Flink. > > > > This different nature of Flink is also why I think that readers need > > awareness of other readers to do the event-time alignment, and this is > > where shared state comes in. > > > > > On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com> > > wrote: > > > > > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > >> I think the new source interface would be designed to be able to > > leverage > > >> shared state to achieve time alignment. > > >> I don't think this would be possible without some kind of shared > state. > > >> > > >> The problem of tasks that are far ahead in time cannot be solved with > > >> back-pressure. > > >> That's because a task cannot choose from which source task it accepts > > >> events and from which doesn't. > > >> If it blocks an input, all downstream tasks that are connected to the > > >> operator are affected. This can easily lead to deadlocks. > > >> Therefore, all operators need to be able to handle events when they > > arrive. > > >> If they cannot process them yet because they are too far ahead in > time, > > >> they are put in state. > > >> > > > > > > The idea I was suggesting is not for operators to block an input. > > Rather, > > > it is that they selectively choose from which input to process the next > > > message from based on their timestamp, so long as there are buffered > > > messages waiting to be processed. That is a best-effort alignment > > > strategy. Seems to work relatively well in practice, at least within > > Kafka > > > Streams. > > > > > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for > both > > > its inputs. Instead, it could keep them separate and selectively > consume > > > from the one that had a buffer available, and if both have buffers > > > available, from the buffer with the messages with a lower timestamp. > > > > >