[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663902#comment-15663902 ]
ASF GitHub Bot commented on FLINK-4174: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2736#discussion_r87798796 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java --- @@ -18,28 +18,77 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** * Stores the value and the timestamp of the record. + * * @param <T> The type encapsulated value */ @PublicEvolving public class TimestampedValue<T> { + /** The actual value held by this record */ private T value; + + /** The timestamp of the record */ private long timestamp; + /** Flag whether the timestamp is actually set */ + private boolean hasTimestamp; + + /** + * Creates a new TimestampedValue. The record does not have a timestamp. + */ + public TimestampedValue(T value) { + this.value = value; + } + + /** + * Creates a new TimestampedValue wrapping the given value. The timestamp is set to the + * given timestamp. + * + * @param value The value to wrap in this {@link TimestampedValue} + * @param timestamp The timestamp in milliseconds + */ public TimestampedValue(T value, long timestamp) { this.value = value; this.timestamp = timestamp; + this.hasTimestamp = true; } + /** + * @return The value wrapped in this {@link TimestampedValue}. + */ public T getValue() { return value; } + /** + * @return The timestamp associated with this stream value in milliseconds. + */ public long getTimestamp() { --- End diff -- We should throw this exception here: ``` throw new IllegalStateException( "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " + "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); ``` and in the `TimeEvictor` check whether the elements have timestamps using `hasTimestamp()`. > Enhance Window Evictor > ---------------------- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming > Reporter: vishnu viswanath > Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)