[ https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931924#comment-15931924 ]
Aljoscha Krettek commented on FLINK-6074: ----------------------------------------- I opened FLINK-6118 which tracks in a more general way the phenomenon you observed. I think this issue can be closed because it "works as intended" for punctuated watermarks which are always data driven and thus depend a bit on how data arrives at parallel instances. For real-world data, where elements with roughly the same timestamps go to all parallel instances this shouldn't be a problem. > Fix processFunction with watermark not work well in tableAPI > ------------------------------------------------------------ > > Key: FLINK-6074 > URL: https://issues.apache.org/jira/browse/FLINK-6074 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > I did a simple test and found that the same > `AssignerWithPunctuatedWatermarks` and` ProcessFunction`, using sqlAPI and > using `DataStreamAPI` get a different watermark, as follows: > ProcessFunction: > {code} > class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, > String)] { > override def processElement(value: (Long, Int, String), > ctx: ProcessFunction[(Long, Int, String), (Long, Int, > String)]#Context, > out: Collector[(Long, Int, String)]): Unit = { > println("WaterMark=" + ctx.timerService.currentWatermark()) > } > override def onTimer( > timestamp: Long, > ctx: ProcessFunction[(Long, Int, String), (Long, Int, > String)]#OnTimerContext, > out: Collector[(Long, Int, String)]): Unit = ??? > } > {code} > AssignerWithPunctuatedWatermarks: > {code} > class TimestampWithLatenessWatermark extends > AssignerWithPunctuatedWatermarks[(Long, > Int, String)] { > override def checkAndGetNextWatermark( > lastElement: (Long, Int, String), > extractedTimestamp: Long) > : Watermark = { > new Watermark(extractedTimestamp) > } > override def extractTimestamp( > element: (Long, Int, String), > previousElementTimestamp: Long): Long = { > element._1 > } > } > {code} > TestDATA: > {code} > val data = List( > (1L, 1, "Hello"), > (2L, 2, "Hello"), > (3L, 3, "Hello"), > (4L, 4, "Hello"), > (5L, 5, "Hello"), > (6L, 6, "Hello"), > (7L, 7, "Hello World"), > (8L, 8, "Hello World"), > (20L, 20, "Hello World")) > {code} > DataStreamAPI: > {code} > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val src = env.fromCollection(data).assignTimestampsAndWatermarks(new > TimestampWithLatenessWatermark()) > src.keyBy(1).process(new TriggeringFlatMapFunction()) > Print: > WaterMark=-9223372036854775808 > WaterMark=3 > WaterMark=5 > WaterMark=7 > WaterMark=1 > WaterMark=2 > WaterMark=4 > WaterMark=8 > WaterMark=6 > {code} > SqlAPI: > {code} > val src = env.fromCollection(data).assignTimestampsAndWatermarks(new > TimestampWithLatenessWatermark()) > val tab = src.toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("T1", tab) > val sqlQuery = "SELECT " + > "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding > AND CURRENT ROW)" + > "from T1" > val result = tEnv.sql(sqlQuery).toDataStream[Row] > Print: > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=2 > WaterMark=6 > {code} > I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct > If there any incorrect usage? -- This message was sent by Atlassian JIRA (v6.3.15#6346)