GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/906

    [FLINK-1967] Introduce (Event)time in Streaming

    ## [FLINK-1967] Introduce (Event)time in Streaming
        
    This introduces an additional timestamp field in StreamRecord. When using a
    SourceFunction and an auto-timestamp interval is set using the
    ExecutionConfig, the timestamp is automatically set to 
System.currentTimeMillis()
    upon element emission. The timestamp can be manually set using an
    EventTimeSourceFunction.
    
    This also changes the signature of the StreamOperators. They now get
    a StreamRecord instead of the unwrapped value. This is necessary for
    them to access the timestamp. Before, the StreamTask would unwrap the
    value from the StreamRecord, now this is moved one layer higher.
    
    This also introduces watermarks to keep track of processing. At
    a configurable interval the sources will emit watermarks that signify
    that no records with a lower timestamp will be emitted in the future by
    this source. The timestamps are broadcast, stream inputs wait for watermark
    events on all input channels and forward the watermark to the
    StreamOperator once the watermark advances on all inputs. Operators are
    responsible for forwarding the watermark once they know that no elements
    with a previous timestamp will be emitted (this is mostly relevant for
    buffering operations such as windows). Right now, all operators simply
    forward the watermark they receive.
    
    When using an EventTimeSourceFunction the system does not
    automatically emit timestamps, the user is required to manually emit
    watermarks using the SourceContext.
    
    No watermarks will be emitted unless
    ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
    interval.
    
    This commit contains fixes for other stuff that was discovered while
    implementing the feature. See Jira issue numbers and descriptions below.
    
    ## [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
    
    This changes CoReader (now CoStreamingRecordReader) to reuse
    UnionGate for the input multiplexing. This way it will not lock in on
    one specific input side and read events from both input sides.
    
    This also enables an event listener for checkpoint barriers so that the
    TwoInputTask now reacts to those and correctly forwards them.
    
    Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
    works in topologies with TwoInputStreamTasks.
    
    This also adds tests for OneInputStreamTask and TwoInputStreamTask
    that check whether:
     - whether open()/close() of RichFunctions are correctly
       called
     - Watermarks are correctly forwarded
     - Supersteps/checkpoint barriers are correctly forwarded and the
       blocking of inputs works correctly
    
    ## Add proper tests for Stream Operators
    
    These test whether:
     - open()/close() on RichFunctions are called
     - Timestamps of emitted elements match the timestamp of the input
       element
     - Watermarks are correctly forwarded
    
    ## [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
    
    Before, a CheckpointBarrier from a more recent Checkpoint would also
    trigger unblocking while waiting on an older CheckpointBarrier. Now,
    a CheckpointBarrier from a newer checkpoint will unblock all channels
    and start a new wait on the new Checkpoint.
    
    The tests for OneInputStreamTask and TwoInputStreamTask check whether
    the buffer behaves correctly when receiving CheckpointBarriers from more
    recent checkpoints while still waiting on an older CheckpointBarrier.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink event-time-in-band

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #906
    
----
commit 89add486b51ce83cd23312ba26b79f2f66e3c12a
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2015-06-22T10:26:44Z

    [FLINK-1967] Introduce (Event)time in Streaming
    
    This introduces an additional timestamp field in StreamRecord. When using a
    SourceFunction and an auto-timestamp interval is set using the
    ExecutionConfig, the timestamp is automatically set to 
System.currentTimeMillis()
    upon element emission. The timestamp can be manually set using an
    EventTimeSourceFunction.
    
    This also changes the signature of the StreamOperators. They now get
    a StreamRecord instead of the unwrapped value. This is necessary for
    them to access the timestamp. Before, the StreamTask would unwrap the
    value from the StreamRecord, now this is moved one layer higher.
    
    This also introduces watermarks to keep track of processing. At
    a configurable interval the sources will emit watermarks that signify
    that no records with a lower timestamp will be emitted in the future by
    this source. The timestamps are broadcast, stream inputs wait for watermark
    events on all input channels and forward the watermark to the
    StreamOperator once the watermark advances on all inputs. Operators are
    responsible for forwarding the watermark once they know that no elements
    with a previous timestamp will be emitted (this is mostly relevant for
    buffering operations such as windows). Right now, all operators simply
    forward the watermark they receive.
    
    When using an EventTimeSourceFunction the system does not
    automatically emit timestamps, the user is required to manually emit
    watermarks using the SourceContext.
    
    No watermarks will be emitted unless
    ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
    interval.
    
    This commit contains fixes for other stuff that was discovered while
    implementing the feature. See Jira issue numbers and descriptions below.
    
    [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
    
    This changes CoReader (now CoStreamingRecordReader) to reuse
    UnionGate for the input multiplexing. This way it will not lock in on
    one specific input side and read events from both input sides.
    
    This also enables an event listener for checkpoint barriers so that the
    TwoInputTask now reacts to those and correctly forwards them.
    
    Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
    works in topologies with TwoInputStreamTasks.
    
    This also adds tests for OneInputStreamTask and TwoInputStreamTask
    that check whether:
     - whether open()/close() of RichFunctions are correctly
       called
     - Watermarks are correctly forwarded
     - Supersteps/checkpoint barriers are correctly forwarded and the
       blocking of inputs works correctly
    
    Add proper tests for Stream Operators
    
    These test whether:
     - open()/close() on RichFunctions are called
     - Timestamps of emitted elements match the timestamp of the input
       element
     - Watermarks are correctly forwarded
    
    [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
    
    Before, a CheckpointBarrier from a more recent Checkpoint would also
    trigger unblocking while waiting on an older CheckpointBarrier. Now,
    a CheckpointBarrier from a newer checkpoint will unblock all channels
    and start a new wait on the new Checkpoint.
    
    The tests for OneInputStreamTask and TwoInputStreamTask check whether
    the buffer behaves correctly when receiving CheckpointBarriers from more
    recent checkpoints while still waiting on an older CheckpointBarrier.

commit d7d7ae1b1e6aa4b69eddcb1bd78af06a8cb0508d
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2015-07-13T16:09:54Z

    WIP on remove-lock-contention

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to