GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/906
[FLINK-1967] Introduce (Event)time in Streaming ## [FLINK-1967] Introduce (Event)time in Streaming This introduces an additional timestamp field in StreamRecord. When using a SourceFunction and an auto-timestamp interval is set using the ExecutionConfig, the timestamp is automatically set to System.currentTimeMillis() upon element emission. The timestamp can be manually set using an EventTimeSourceFunction. This also changes the signature of the StreamOperators. They now get a StreamRecord instead of the unwrapped value. This is necessary for them to access the timestamp. Before, the StreamTask would unwrap the value from the StreamRecord, now this is moved one layer higher. This also introduces watermarks to keep track of processing. At a configurable interval the sources will emit watermarks that signify that no records with a lower timestamp will be emitted in the future by this source. The timestamps are broadcast, stream inputs wait for watermark events on all input channels and forward the watermark to the StreamOperator once the watermark advances on all inputs. Operators are responsible for forwarding the watermark once they know that no elements with a previous timestamp will be emitted (this is mostly relevant for buffering operations such as windows). Right now, all operators simply forward the watermark they receive. When using an EventTimeSourceFunction the system does not automatically emit timestamps, the user is required to manually emit watermarks using the SourceContext. No watermarks will be emitted unless ExecutionConfig.setAutoWatermarkInterval is used to set a watermark interval. This commit contains fixes for other stuff that was discovered while implementing the feature. See Jira issue numbers and descriptions below. ## [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing This changes CoReader (now CoStreamingRecordReader) to reuse UnionGate for the input multiplexing. This way it will not lock in on one specific input side and read events from both input sides. This also enables an event listener for checkpoint barriers so that the TwoInputTask now reacts to those and correctly forwards them. Then, this adds CoStreamCheckpointintITCase to verify that checkpointing works in topologies with TwoInputStreamTasks. This also adds tests for OneInputStreamTask and TwoInputStreamTask that check whether: - whether open()/close() of RichFunctions are correctly called - Watermarks are correctly forwarded - Supersteps/checkpoint barriers are correctly forwarded and the blocking of inputs works correctly ## Add proper tests for Stream Operators These test whether: - open()/close() on RichFunctions are called - Timestamps of emitted elements match the timestamp of the input element - Watermarks are correctly forwarded ## [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests Before, a CheckpointBarrier from a more recent Checkpoint would also trigger unblocking while waiting on an older CheckpointBarrier. Now, a CheckpointBarrier from a newer checkpoint will unblock all channels and start a new wait on the new Checkpoint. The tests for OneInputStreamTask and TwoInputStreamTask check whether the buffer behaves correctly when receiving CheckpointBarriers from more recent checkpoints while still waiting on an older CheckpointBarrier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink event-time-in-band Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #906 ---- commit 89add486b51ce83cd23312ba26b79f2f66e3c12a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2015-06-22T10:26:44Z [FLINK-1967] Introduce (Event)time in Streaming This introduces an additional timestamp field in StreamRecord. When using a SourceFunction and an auto-timestamp interval is set using the ExecutionConfig, the timestamp is automatically set to System.currentTimeMillis() upon element emission. The timestamp can be manually set using an EventTimeSourceFunction. This also changes the signature of the StreamOperators. They now get a StreamRecord instead of the unwrapped value. This is necessary for them to access the timestamp. Before, the StreamTask would unwrap the value from the StreamRecord, now this is moved one layer higher. This also introduces watermarks to keep track of processing. At a configurable interval the sources will emit watermarks that signify that no records with a lower timestamp will be emitted in the future by this source. The timestamps are broadcast, stream inputs wait for watermark events on all input channels and forward the watermark to the StreamOperator once the watermark advances on all inputs. Operators are responsible for forwarding the watermark once they know that no elements with a previous timestamp will be emitted (this is mostly relevant for buffering operations such as windows). Right now, all operators simply forward the watermark they receive. When using an EventTimeSourceFunction the system does not automatically emit timestamps, the user is required to manually emit watermarks using the SourceContext. No watermarks will be emitted unless ExecutionConfig.setAutoWatermarkInterval is used to set a watermark interval. This commit contains fixes for other stuff that was discovered while implementing the feature. See Jira issue numbers and descriptions below. [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing This changes CoReader (now CoStreamingRecordReader) to reuse UnionGate for the input multiplexing. This way it will not lock in on one specific input side and read events from both input sides. This also enables an event listener for checkpoint barriers so that the TwoInputTask now reacts to those and correctly forwards them. Then, this adds CoStreamCheckpointintITCase to verify that checkpointing works in topologies with TwoInputStreamTasks. This also adds tests for OneInputStreamTask and TwoInputStreamTask that check whether: - whether open()/close() of RichFunctions are correctly called - Watermarks are correctly forwarded - Supersteps/checkpoint barriers are correctly forwarded and the blocking of inputs works correctly Add proper tests for Stream Operators These test whether: - open()/close() on RichFunctions are called - Timestamps of emitted elements match the timestamp of the input element - Watermarks are correctly forwarded [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests Before, a CheckpointBarrier from a more recent Checkpoint would also trigger unblocking while waiting on an older CheckpointBarrier. Now, a CheckpointBarrier from a newer checkpoint will unblock all channels and start a new wait on the new Checkpoint. The tests for OneInputStreamTask and TwoInputStreamTask check whether the buffer behaves correctly when receiving CheckpointBarriers from more recent checkpoints while still waiting on an older CheckpointBarrier. commit d7d7ae1b1e6aa4b69eddcb1bd78af06a8cb0508d Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2015-07-13T16:09:54Z WIP on remove-lock-contention ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---