[ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cody closed FLINK-4215. ----------------------- Resolution: Invalid > timestamp of StreamRecord is lost in WindowOperator > --------------------------------------------------- > > Key: FLINK-4215 > URL: https://issues.apache.org/jira/browse/FLINK-4215 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.0.3 > Reporter: Cody > > In a WindowedStream, if the subsequent operator is a WindowOperator(by > applying a fold function), the timestamp of StreamRecord will be lost. Here's > my test code: > --------------------------------------------- > def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): > DataStream[(Int, Long, String, String)] = { > val data = new mutable.MutableList[(Int, Long, String, String)] > data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) > data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) > data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) > env.fromCollection(data) > } > @Test > def testTimestampInWindowOperator(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = > StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( > new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { > override def getCurrentWatermark: Watermark = null > override def extractTimestamp(element: (Int, Long, String, String), > previousElementTimestamp: Long): Long = { > DateFormat.getDateTimeInstance.parse(element._4).getTime > } > }).keyBy(3).timeWindow(Time.milliseconds(1000)) > .fold((0, 0L, "", ""), > new FoldFunction[(Int, Long, String, String), (Int, Long, String, > String)] { > override def fold(v1: (Int, Long, String, String), v2: (Int, Long, > String, String)) > : (Int, Long, String, String) = { > (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) > } > }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) > env.execute() > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)