[ https://issues.apache.org/jira/browse/FLINK-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174450#comment-16174450 ]
Stephan Ewen commented on FLINK-4558: ------------------------------------- +1 on that, have been thinking about that for a bit. We are currently preparing the network stack to handle these aligning situations better, based on that we should implement that event time alignment. The devil is probably in the details here, for example when should the backpressure kick in. The amount of drift between inputs that works still well depends a lot in the data rate or "density" of the streams. The logic would need to be a bit smarter than just "block inputs that are more than X ahead of other streams". > Add support for synchronizing streams > ------------------------------------- > > Key: FLINK-4558 > URL: https://issues.apache.org/jira/browse/FLINK-4558 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Affects Versions: 1.1.0 > Reporter: Elias Levy > > As mentioned on the [mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html], > there are use cases that require synchronizing two streams on via their > times and where it is not practical to buffer all messages from one streams > while waiting for the other to synchronize. Flink should add functionality > to enable such use cases. > This could be implemented by modifying TwoInputStreamOperator so that calls > to processElement1 and processElement2 could return a value indicating that > the element can't yet be processed, having the framework then pause > processing for some time, potentially using exponential back off with a hard > maximum, and then allowing the back pressure system to do its work and pause > the stream. > Alternatively, an API could be added to explicitly pause/unpause a stream. > For ease of use either of these mechanism should be used to create a > SynchronizedTwoInputStreamOperator that end users can utilize by passing a > configurable time delta to use as a synchronization threshold. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)