[ https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931569#comment-15931569 ]
sunjincheng commented on FLINK-6074: ------------------------------------ When I using more test data,I find the cause of the above different behavior is the default setting of 4 DEFAULT_PARALLELISM in `StreamingMultipleProgramsTestBase`. e.g.: TestData {code} val data = List( (1L, 1, "Hello World"), (2L, 2, "Hello World"), (3L, 3, "Hello World"), (4L, 4, "Hello World"), (5L, 5, "Hello World"), (6L, 6, "Hello World"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (9L, 8, "Hello World"), (10L, 8, "Hello World"), (11L, 8, "Hello World"), (12L, 8, "Hello World"), (13L, 8, "Hello World"), (14L, 8, "Hello World"), (15L, 8, "Hello World"), (16L, 8, "Hello World"), (17L, 8, "Hello World"), (18L, 8, "Hello World"), (19L, 8, "Hello World"), (20L, 8, "Hello World"), (21L, 8, "Hello World"), (22L, 8, "Hello World"), (23L, 8, "Hello World"), (24L, 8, "Hello World"), (25L, 8, "Hello World"), (26L, 8, "Hello World"), (27L, 8, "Hello World"), (28L, 20, "Hello World")) {code} Result: {code} WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=-9223372036854775808 WaterMark=4 WaterMark=8 WaterMark=12 WaterMark=16 WaterMark=20 WaterMark=24 {code} When I set: {code} env.setParallelism(1) {code} Result: {code} initializeState=> keyedStateBackend null initializeState=> keyedStateBackend null initializeState=> keyedStateBackend null initializeState=> keyedStateBackend null initializeState=> keyedStateBackend null WaterMark=-9223372036854775808 WaterMark=1 WaterMark=2 WaterMark=3 WaterMark=4 WaterMark=5 WaterMark=6 WaterMark=7 WaterMark=8 WaterMark=9 WaterMark=10 WaterMark=11 WaterMark=12 WaterMark=13 WaterMark=14 WaterMark=15 WaterMark=16 WaterMark=17 WaterMark=18 WaterMark=19 WaterMark=20 WaterMark=21 WaterMark=22 WaterMark=23 WaterMark=24 WaterMark=25 WaterMark=26 WaterMark=27 {code} Then SqlAPI and DataStreamAPI have the same behavior. > Fix processFunction with watermark not work well in tableAPI > ------------------------------------------------------------ > > Key: FLINK-6074 > URL: https://issues.apache.org/jira/browse/FLINK-6074 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > I did a simple test and found that the same > `AssignerWithPunctuatedWatermarks` and` ProcessFunction`, using sqlAPI and > using `DataStreamAPI` get a different watermark, as follows: > ProcessFunction: > {code} > class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, > String)] { > override def processElement(value: (Long, Int, String), > ctx: ProcessFunction[(Long, Int, String), (Long, Int, > String)]#Context, > out: Collector[(Long, Int, String)]): Unit = { > println("WaterMark=" + ctx.timerService.currentWatermark()) > } > override def onTimer( > timestamp: Long, > ctx: ProcessFunction[(Long, Int, String), (Long, Int, > String)]#OnTimerContext, > out: Collector[(Long, Int, String)]): Unit = ??? > } > {code} > AssignerWithPunctuatedWatermarks: > {code} > class TimestampWithLatenessWatermark extends > AssignerWithPunctuatedWatermarks[(Long, > Int, String)] { > override def checkAndGetNextWatermark( > lastElement: (Long, Int, String), > extractedTimestamp: Long) > : Watermark = { > new Watermark(extractedTimestamp) > } > override def extractTimestamp( > element: (Long, Int, String), > previousElementTimestamp: Long): Long = { > element._1 > } > } > {code} > TestDATA: > {code} > val data = List( > (1L, 1, "Hello"), > (2L, 2, "Hello"), > (3L, 3, "Hello"), > (4L, 4, "Hello"), > (5L, 5, "Hello"), > (6L, 6, "Hello"), > (7L, 7, "Hello World"), > (8L, 8, "Hello World"), > (20L, 20, "Hello World")) > {code} > DataStreamAPI: > {code} > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val src = env.fromCollection(data).assignTimestampsAndWatermarks(new > TimestampWithLatenessWatermark()) > src.keyBy(1).process(new TriggeringFlatMapFunction()) > Print: > WaterMark=-9223372036854775808 > WaterMark=3 > WaterMark=5 > WaterMark=7 > WaterMark=1 > WaterMark=2 > WaterMark=4 > WaterMark=8 > WaterMark=6 > {code} > SqlAPI: > {code} > val src = env.fromCollection(data).assignTimestampsAndWatermarks(new > TimestampWithLatenessWatermark()) > val tab = src.toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("T1", tab) > val sqlQuery = "SELECT " + > "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding > AND CURRENT ROW)" + > "from T1" > val result = tEnv.sql(sqlQuery).toDataStream[Row] > Print: > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=-9223372036854775808 > WaterMark=2 > WaterMark=6 > {code} > I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct > If there any incorrect usage? -- This message was sent by Atlassian JIRA (v6.3.15#6346)