The reason this selective reading doesn't work well in Flink in the moment is 
because of checkpointing. For checkpointing, checkpoint barriers travel within 
the streams. If we selectively read from inputs based on timestamps this is 
akin to blocking an input if that input is very far ahead in event time, which 
can happen when you have a very fast source and a slow source (in event time), 
maybe because you're in a catchup phase. In those cases it's better to simply 
not read the data at the sources, as Thomas said. This is also because with 
Kafka Streams, each operator is basically its own job: it's reading from Kafka 
and writing to Kafka and there is not a complex graph of different operations 
with network shuffles in between, as you have with Flink.

This different nature of Flink is also why I think that readers need awareness 
of other readers to do the event-time alignment, and this is where shared state 
comes in.

> On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com> wrote:
> 
>> I think the new source interface would be designed to be able to leverage
>> shared state to achieve time alignment.
>> I don't think this would be possible without some kind of shared state.
>> 
>> The problem of tasks that are far ahead in time cannot be solved with
>> back-pressure.
>> That's because a task cannot choose from which source task it accepts
>> events and from which doesn't.
>> If it blocks an input, all downstream tasks that are connected to the
>> operator are affected. This can easily lead to deadlocks.
>> Therefore, all operators need to be able to handle events when they arrive.
>> If they cannot process them yet because they are too far ahead in time,
>> they are put in state.
>> 
> 
> The idea I was suggesting is not for operators to block an input.  Rather,
> it is that they selectively choose from which input to process the next
> message from based on their timestamp, so long as there are buffered
> messages waiting to be processed.  That is a best-effort alignment
> strategy.  Seems to work relatively well in practice, at least within Kafka
> Streams.
> 
> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> its inputs.  Instead, it could keep them separate and selectively consume
> from the one that had a buffer available, and if both have buffers
> available, from the buffer with the messages with a lower timestamp.

Reply via email to