[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663902#comment-15663902
 ] 

ASF GitHub Bot commented on FLINK-4174:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r87798796
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
 ---
    @@ -18,28 +18,77 @@
     package org.apache.flink.streaming.runtime.operators.windowing;
     
     import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
     
     /**
      * Stores the value and the timestamp of the record.
    + * 
      * @param <T> The type encapsulated value
      */
     @PublicEvolving
     public class TimestampedValue<T> {
     
    +   /** The actual value held by this record */
        private T value;
    +
    +   /** The timestamp of the record */
        private long timestamp;
     
    +   /** Flag whether the timestamp is actually set */
    +   private boolean hasTimestamp;
    +
    +   /**
    +    * Creates a new TimestampedValue. The record does not have a timestamp.
    +    */
    +   public TimestampedValue(T value) {
    +           this.value = value;
    +   }
    +
    +   /**
    +    * Creates a new TimestampedValue wrapping the given value. The 
timestamp is set to the
    +    * given timestamp.
    +    *
    +    * @param value The value to wrap in this {@link TimestampedValue}
    +    * @param timestamp The timestamp in milliseconds
    +    */
        public TimestampedValue(T value, long timestamp) {
                this.value = value;
                this.timestamp = timestamp;
    +           this.hasTimestamp = true;
        }
     
    +   /**
    +    * @return The value wrapped in this {@link TimestampedValue}.
    +    */
        public T getValue() {
                return value;
        }
     
    +   /**
    +    * @return The timestamp associated with this stream value in 
milliseconds.
    +     */
        public long getTimestamp() {
    --- End diff --
    
    We should throw this exception here:
    ```
    throw new IllegalStateException(
                                        "Record has no timestamp. Is the time 
characteristic set to 'ProcessingTime', or " +
                                                        "did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
    ```
    
    and in the `TimeEvictor` check whether the elements have timestamps using 
`hasTimestamp()`.


> Enhance Window Evictor
> ----------------------
>
>                 Key: FLINK-4174
>                 URL: https://issues.apache.org/jira/browse/FLINK-4174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: vishnu viswanath
>            Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to