[ 
https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931569#comment-15931569
 ] 

sunjincheng commented on FLINK-6074:
------------------------------------

When I using more test data,I find the cause of the above different behavior is 
the default setting of 4 DEFAULT_PARALLELISM in 
`StreamingMultipleProgramsTestBase`. e.g.:
TestData
{code}
val data = List(
    (1L, 1, "Hello World"),
    (2L, 2, "Hello World"),
    (3L, 3, "Hello World"),
    (4L, 4, "Hello World"),
    (5L, 5, "Hello World"),
    (6L, 6, "Hello World"),
    (7L, 7, "Hello World"),
    (8L, 8, "Hello World"),
    (9L, 8, "Hello World"),
    (10L, 8, "Hello World"),
    (11L, 8, "Hello World"),
    (12L, 8, "Hello World"),
    (13L, 8, "Hello World"),
    (14L, 8, "Hello World"),
    (15L, 8, "Hello World"),
    (16L, 8, "Hello World"),
    (17L, 8, "Hello World"),
    (18L, 8, "Hello World"),
    (19L, 8, "Hello World"),
    (20L, 8, "Hello World"),
    (21L, 8, "Hello World"),
    (22L, 8, "Hello World"),
    (23L, 8, "Hello World"),
    (24L, 8, "Hello World"),
    (25L, 8, "Hello World"),
    (26L, 8, "Hello World"),
    (27L, 8, "Hello World"),
    (28L, 20, "Hello World"))
{code}
Result:
{code}
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=4
WaterMark=8
WaterMark=12
WaterMark=16
WaterMark=20
WaterMark=24
{code}

When I set:
{code}
env.setParallelism(1)
{code}

Result:
{code}
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
initializeState=> keyedStateBackend null
WaterMark=-9223372036854775808
WaterMark=1
WaterMark=2
WaterMark=3
WaterMark=4
WaterMark=5
WaterMark=6
WaterMark=7
WaterMark=8
WaterMark=9
WaterMark=10
WaterMark=11
WaterMark=12
WaterMark=13
WaterMark=14
WaterMark=15
WaterMark=16
WaterMark=17
WaterMark=18
WaterMark=19
WaterMark=20
WaterMark=21
WaterMark=22
WaterMark=23
WaterMark=24
WaterMark=25
WaterMark=26
WaterMark=27
{code}
Then SqlAPI and DataStreamAPI have the same behavior.

> Fix processFunction with watermark not work well in tableAPI
> ------------------------------------------------------------
>
>                 Key: FLINK-6074
>                 URL: https://issues.apache.org/jira/browse/FLINK-6074
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> I did a simple test and found that the same 
> `AssignerWithPunctuatedWatermarks` and` ProcessFunction`, using sqlAPI and 
> using `DataStreamAPI` get a different watermark, as follows:
> ProcessFunction:
> {code}
> class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, 
> String)] {
>     override def processElement(value: (Long, Int, String),
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, 
> String)]#Context,
>         out: Collector[(Long, Int, String)]): Unit = {
>         println("WaterMark=" + ctx.timerService.currentWatermark())
>     }
>     override def onTimer(
>         timestamp: Long,
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, 
> String)]#OnTimerContext,
>         out: Collector[(Long, Int, String)]): Unit = ???
>     }
> {code}
> AssignerWithPunctuatedWatermarks:
> {code}
> class TimestampWithLatenessWatermark extends 
> AssignerWithPunctuatedWatermarks[(Long,
>     Int, String)] {
>     override def checkAndGetNextWatermark(
>       lastElement: (Long, Int, String),
>       extractedTimestamp: Long)
>     : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>       element: (Long, Int, String),
>       previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}
> TestDATA:
> {code}
> val data = List(
>     (1L, 1, "Hello"),
>     (2L, 2, "Hello"),
>     (3L, 3, "Hello"),
>     (4L, 4, "Hello"),
>     (5L, 5, "Hello"),
>     (6L, 6, "Hello"),
>     (7L, 7, "Hello World"),
>     (8L, 8, "Hello World"),
>     (20L, 20, "Hello World"))
> {code}
> DataStreamAPI:
> {code}
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
> TimestampWithLatenessWatermark())
>   src.keyBy(1).process(new TriggeringFlatMapFunction())
> Print:
> WaterMark=-9223372036854775808
> WaterMark=3
> WaterMark=5
> WaterMark=7
> WaterMark=1
> WaterMark=2
> WaterMark=4
> WaterMark=8
> WaterMark=6
> {code}
> SqlAPI:
> {code}
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
> TimestampWithLatenessWatermark())
>     val tab = src.toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("T1", tab)
>        val sqlQuery = "SELECT " +
>       "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding 
> AND CURRENT ROW)" +
>       "from T1"
>     val result = tEnv.sql(sqlQuery).toDataStream[Row]
> Print:
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=2
> WaterMark=6
> {code}
> I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct 
> If there any incorrect usage? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to