你用的是哪个版本的Flink呢?
-----
1.11.2
看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
所以你的binlog是怎么读进来的呢?自定义的format?
-----
ts 就是时间戳
bsTableEnv.executeSql("""
CREATE TABLE input_database (
`table` STRING,
`database` STRING,
`data` ROW(
reference_id STRING,
transaction_sn STRING,
transaction_type BIGINT,
merchant_id BIGINT,
transaction_id BIGINT,
status BIGINT
),
ts BIGINT,
event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'mytopic',
'connector.properties.bootstrap.servers' = 'xxxx',
'format.type' = 'json'
)
)
```
Benchao Li <[email protected]> 于2020年12月10日周四 下午6:14写道:
> 你用的是哪个版本的Flink呢?
>
> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> 所以你的binlog是怎么读进来的呢?自定义的format?
>
> macia kk <[email protected]> 于2020年12月10日周四 上午1:06写道:
>
> > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time
> -
> > INTERVAL 'x' HOUR
> >
> > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
> >
> > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> > 能够反推出来数据的 currentMaxTimestamp
> >
> > currentMaxTimestamp = watermark + maxOutOfOrderness
> >
> > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
> >
> >
> > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
> >
> >
> {"table":"transaction_tab_00000122","database":"main_db","transaction_type":1,"transaction_id":111111,"reference_id":"111111","transaction_sn":"11111","merchant_id":1,"status":1,"event_time":"
> > *2020-12-10T01:02:24Z*"}
> >
> > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> > GMT+08:00)
> >
> > 这个 watermark 是未来的时间 😂
> >
> >
> >
> >
> >
> > macia kk <[email protected]> 于2020年12月9日周三 下午11:36写道:
> >
> > > 感谢 一旦 和 Benchao
> > >
> > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我
> > Job
> > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> > >
> > > val result = bsTableEnv.sqlQuery("""
> > > SELECT *
> > > FROM (
> > > SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > t1.transaction_id,
> > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > t1.status, t1.event_time
> > > FROM main_db as t1
> > > LEFT JOIN main_db as t2
> > > ON t1.reference_id = t2.reference_id
> > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> > > AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> > > )
> > > """.stripMargin)
> > >
> > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> > >
> > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > > subtask的watermark。
> > > -------------------------------------------------------
> > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> > >
> > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
> event
> > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> > >
> > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> > >
> > >
> > > Thanks and best regards
> > >
> > >
> > > Benchao Li <[email protected]> 于2020年12月9日周三 上午10:24写道:
> > >
> > >> Hi macia,
> > >>
> > >> 一旦回答的基本比较完整了。
> > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> > >>
> > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> > subtask见到的最大的watermark
> > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> > 10个小时,这个已经会导致
> > >> 你的没有join到的数据下发会延迟很多了。
> > >>
> > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> > >>
> > >> 赵一旦 <[email protected]> 于2020年12月9日周三 上午10:15写道:
> > >>
> > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> > >> >
> > >> >
> > >> >
> > >>
> >
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> > >> > join。
> > >> >
> > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> > >> >
> > >> >
> > >>
> >
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> > >> >
> > >> > macia kk <[email protected]> 于2020年12月9日周三 上午1:17写道:
> > >> >
> > >> > > @Benchao Li <[email protected]> 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > >> > > FLink,可能我的Case 太特殊了.
> > >> > >
> > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> > >> Binlog,我需要
> > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab,
> 两个
> > DB
> > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> > >> > >
> > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> > >> > watermark
> > >> > > forward on.
> > >> > >
> > >> > > bsTableEnv.executeSql("""
> > >> > > CREATE TABLE input_database (
> > >> > > `table` STRING,
> > >> > > `database` STRING,
> > >> > > `data` ROW(
> > >> > > reference_id STRING,
> > >> > > transaction_sn STRING,
> > >> > > transaction_type BIGINT,
> > >> > > merchant_id BIGINT,
> > >> > > transaction_id BIGINT,
> > >> > > status BIGINT
> > >> > > ),
> > >> > > ts BIGINT,
> > >> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > >> > > WATERMARK FOR event_time AS event_time - INTERVAL '10'
> HOUR
> > >> > > ) WITH (
> > >> > > 'connector.type' = 'kafka',
> > >> > > 'connector.version' = '0.11',
> > >> > > 'connector.topic' = 'mytopic',
> > >> > > 'connector.properties.bootstrap.servers' = 'xxxx',
> > >> > > 'format.type' = 'json'
> > >> > > )
> > >> > > """)
> > >> > >
> > >> > >
> > >> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> > >> > >
> > >> > > val main_db = bsTableEnv.sqlQuery("""
> > >> > > | SELECT *
> > >> > > | FROM input_database
> > >> > > | WHERE `database` = 'main_db'
> > >> > > | AND `table` LIKE 'transaction_tab%'
> > >> > > | """.stripMargin)
> > >> > >
> > >> > > val merchant_db = bsTableEnv.sqlQuery("""
> > >> > > | SELECT *
> > >> > > | FROM input_database
> > >> > > | WHERE `database` = 'merchant_db'
> > >> > > | AND `table` LIKE 'transaction_tab%'
> > >> > > | """.stripMargin)
> > >> > >
> > >> > > bsTableEnv.createTemporaryView("main_db", main_db)
> > >> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> > >> > >
> > >> > > val result = bsTableEnv.sqlQuery("""
> > >> > > SELECT *
> > >> > > FROM (
> > >> > > SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > >> > > t1.transaction_id,
> > >> > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > >> > > t1.status, t1.event_time
> > >> > > FROM main_db as t1
> > >> > > LEFT JOIN merchant_db as t2
> > >> > > ON t1.reference_id = t2.reference_id
> > >> > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> > >> > > AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> > >> > > )
> > >> > > """.stripMargin)
> > >> > >
> > >> > >
> > >> > >
> > >> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >> > > -----
> > >> > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要
> > >> watermark
> > >> > > 来驱动。
> > >> > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以
> > >> join上,就输出
> > >> > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > Benchao Li <[email protected]> 于2020年12月8日周二 下午3:23写道:
> > >> > >
> > >> > > > hi macia,
> > >> > > >
> > >> > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >> > > >
> > >> > > > macia kk <[email protected]> 于2020年12月8日周二 上午1:15写道:
> > >> > > >
> > >> > > > > 抱歉,是 >-30 and <+30
> > >> > > > >
> > >> > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > >> > > > >
> > >> > > > > 赵一旦 <[email protected]>于2020年12月7日 周一23:28写道:
> > >> > > > >
> > >> > > > > > 准确点,2个条件之间没and?2个都是>?
> > >> > > > > >
> > >> > > > > > macia kk <[email protected]> 于2020年12月7日周一 下午10:30写道:
> > >> > > > > >
> > >> > > > > > > 不好意思,我上边贴错了
> > >> > > > > > >
> > >> > > > > > > SELECT *
> > >> > > > > > > FROM A
> > >> > > > > > > LEFT OUT JOIN B
> > >> > > > > > > ON order_id
> > >> > > > > > > Where A.event_time > B.event_time - 30 s
> > >> > > > > > > A.event_time > B.event_time + 30 s
> > >> > > > > > >
> > >> > > > > > > event_time 是 Time Attributes 设置的 event_time
> > >> > > > > > >
> > >> > > > > > > 这样是没有输出的。
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > interval join 左右表在 state 中是缓存多久的?
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > hailongwang <[email protected]> 于2020年12月7日周一 下午8:05写道:
> > >> > > > > > >
> > >> > > > > > > > Hi,
> > >> > > > > > > > 其中 条件是
> > >> > > > > > > > `Where A.event_time < B.event_time + 30 s and
> > A.event_time >
> > >> > > > > > B.event_time
> > >> > > > > > > > - 30 s ` 吧
> > >> > > > > > > > 可以参考以下例子[1],看下有木有写错。
> > >> > > > > > > > [1]
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > Best,
> > >> > > > > > > > Hailong
> > >> > > > > > > > 在 2020-12-07 13:10:02,"macia kk" <[email protected]> 写道:
> > >> > > > > > > > >Hi, 各位大佬
> > >> > > > > > > > >
> > >> > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog
> 打进去了。我的
> > >> > > > > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是
> order事件
> > >> ,表B 是
> > >> > > > order
> > >> > > > > > > item
> > >> > > > > > > > >信息,所以 我用:
> > >> > > > > > > > >
> > >> > > > > > > > > SELECT *
> > >> > > > > > > > > FROM A
> > >> > > > > > > > > LEFT OUT JOIN B
> > >> > > > > > > > > ON order_id
> > >> > > > > > > > > Where A.event_time > B.event_time + 30 s
> > >> > > > > > > > > A.event_time > B.event_time - 30 s
> > >> > > > > > > > >
> > >> > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join
> > 之后就没有输出数据了,可以确认的是我用
> > >> > Spark
> > >> > > > > > > > Structural
> > >> > > > > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > >> > > > > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join
> > >> > 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > > >
> > >> > > > --
> > >> > > >
> > >> > > > Best,
> > >> > > > Benchao Li
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>