[ 
https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6074:
-------------------------------
    Description: 
I did a simple test and found that the same `AssignerWithPunctuatedWatermarks` 
and` ProcessFunction`, using sqlAPI and using `DataStreamAPI` get a different 
watermark, as follows:
ProcessFunction:
{code}
class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, 
String)] {
    override def processElement(value: (Long, Int, String),
        ctx: ProcessFunction[(Long, Int, String), (Long, Int, String)]#Context,
        out: Collector[(Long, Int, String)]): Unit = {
        println("WaterMark=" + ctx.timerService.currentWatermark())
    }
    override def onTimer(
        timestamp: Long,
        ctx: ProcessFunction[(Long, Int, String), (Long, Int, 
String)]#OnTimerContext,
        out: Collector[(Long, Int, String)]): Unit = ???
    }
{code}

AssignerWithPunctuatedWatermarks:
{code}
class TimestampWithLatenessWatermark extends 
AssignerWithPunctuatedWatermarks[(Long,
    Int, String)] {
    override def checkAndGetNextWatermark(
      lastElement: (Long, Int, String),
      extractedTimestamp: Long)
    : Watermark = {
      new Watermark(extractedTimestamp)
    }
    override def extractTimestamp(
      element: (Long, Int, String),
      previousElementTimestamp: Long): Long = {
      element._1
    }
  }
{code}
TestDATA:
{code}
val data = List(
    (1L, 1, "Hello"),
    (2L, 2, "Hello"),
    (3L, 3, "Hello"),
    (4L, 4, "Hello"),
    (5L, 5, "Hello"),
    (6L, 6, "Hello"),
    (7L, 7, "Hello World"),
    (8L, 8, "Hello World"),
    (20L, 20, "Hello World"))
{code}

DataStreamAPI:
{code}
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
TimestampWithLatenessWatermark())
  src.keyBy(1).process(new TriggeringFlatMapFunction())

Print:
WaterMark=-9223372036854775808
WaterMark=3
WaterMark=5
WaterMark=7
WaterMark=1
WaterMark=2
WaterMark=4
WaterMark=8
WaterMark=6
{code}

SqlAPI:
{code}
val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
TimestampWithLatenessWatermark())
    val tab = src.toTable(tEnv).as('a, 'b, 'c)
    tEnv.registerTable("T1", tab)
       val sqlQuery = "SELECT " +
      "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND 
CURRENT ROW)" +
      "from T1"
    val result = tEnv.sql(sqlQuery).toDataStream[Row]

Print:
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=-9223372036854775808
WaterMark=2
WaterMark=6
{code}
I'll checkout 
Is there any incorrect here? Welcome anyone to correct.

  was:add description later


> Fix processFunction with watermark not work well in tableAPI
> ------------------------------------------------------------
>
>                 Key: FLINK-6074
>                 URL: https://issues.apache.org/jira/browse/FLINK-6074
>             Project: Flink
>          Issue Type: Bug
>            Reporter: sunjincheng
>
> I did a simple test and found that the same 
> `AssignerWithPunctuatedWatermarks` and` ProcessFunction`, using sqlAPI and 
> using `DataStreamAPI` get a different watermark, as follows:
> ProcessFunction:
> {code}
> class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int, 
> String)] {
>     override def processElement(value: (Long, Int, String),
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, 
> String)]#Context,
>         out: Collector[(Long, Int, String)]): Unit = {
>         println("WaterMark=" + ctx.timerService.currentWatermark())
>     }
>     override def onTimer(
>         timestamp: Long,
>         ctx: ProcessFunction[(Long, Int, String), (Long, Int, 
> String)]#OnTimerContext,
>         out: Collector[(Long, Int, String)]): Unit = ???
>     }
> {code}
> AssignerWithPunctuatedWatermarks:
> {code}
> class TimestampWithLatenessWatermark extends 
> AssignerWithPunctuatedWatermarks[(Long,
>     Int, String)] {
>     override def checkAndGetNextWatermark(
>       lastElement: (Long, Int, String),
>       extractedTimestamp: Long)
>     : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>       element: (Long, Int, String),
>       previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}
> TestDATA:
> {code}
> val data = List(
>     (1L, 1, "Hello"),
>     (2L, 2, "Hello"),
>     (3L, 3, "Hello"),
>     (4L, 4, "Hello"),
>     (5L, 5, "Hello"),
>     (6L, 6, "Hello"),
>     (7L, 7, "Hello World"),
>     (8L, 8, "Hello World"),
>     (20L, 20, "Hello World"))
> {code}
> DataStreamAPI:
> {code}
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
> TimestampWithLatenessWatermark())
>   src.keyBy(1).process(new TriggeringFlatMapFunction())
> Print:
> WaterMark=-9223372036854775808
> WaterMark=3
> WaterMark=5
> WaterMark=7
> WaterMark=1
> WaterMark=2
> WaterMark=4
> WaterMark=8
> WaterMark=6
> {code}
> SqlAPI:
> {code}
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new 
> TimestampWithLatenessWatermark())
>     val tab = src.toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("T1", tab)
>        val sqlQuery = "SELECT " +
>       "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding 
> AND CURRENT ROW)" +
>       "from T1"
>     val result = tEnv.sql(sqlQuery).toDataStream[Row]
> Print:
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=2
> WaterMark=6
> {code}
> I'll checkout 
> Is there any incorrect here? Welcome anyone to correct.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to