??????????????????????????????????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??23??(??????) ????9:30
??????:&nbsp;"????"<[email protected]&gt;;
????:&nbsp;"user-zh"<[email protected]&gt;;
????:&nbsp;Re: flinksql1.11????????????????



Hi,

??????query????????????????join???? FOR SYSTEM_TIME AS OF ??????????????regular 
join??mysql????bounded??????????????????????????????????????????
????join??????????????????????look up ????????????????????????????temporal 
table(??????)????????????[1] ???? temporal table join


????
Leonard Xu
[1]&nbsp; 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins&gt;

&gt; ?? 2020??7??23????09:06?????? <[email protected]&gt; ??????
&gt; 
&gt; 
&gt; HI??
&gt; 
????????????????????????????????????????????????????????????????????????????????????????????????????
&gt;&gt;&gt; ????&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type&nbsp;&nbsp; 
&gt;&gt;&gt; 2  err
&gt;&gt;&gt; 1  err
&gt;&gt;&gt; ????
&gt;&gt;&gt; 
&gt; 1  err     20200723085754
&gt; 2  err     20200723085755
&gt; 3  err     20200723085756
&gt; 4  err     20200723085757
&gt; 
&gt; 
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????2s??
&gt;&gt;&gt; ????&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type&nbsp;&nbsp; 
&gt;&gt;&gt; 2  acc
&gt;&gt;&gt; 1  acc
&gt;&gt;&gt; ????
&gt; 
&gt; 94 err     20200723084455
&gt; 95 err     20200723084456
&gt; 96 err     20200723084457
&gt; 97 err     20200723084458
&gt; 98 err     20200723084459
&gt; 99 err     20200723084500
&gt; 100        err     20200723084501
&gt; 
&gt; 
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
&gt; 
&gt; ????????????????????????????????????????????????????
&gt; 
&gt; 
&gt; ??????
&gt; 
&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt; 
&gt; 
&gt; ------------------ ???????? ------------------
&gt; ??????: "Leonard Xu" <[email protected]&gt;;
&gt; ????????: 2020??7??22??(??????) ????9:39
&gt; ??????: "????"<[email protected]&gt;;
&gt; ????: Re: flinksql1.11????????????????
&gt; 
&gt; <[email protected]&gt;
&gt; 
&gt; ????????????????????????????????????????????????????????????
&gt; 
&gt; ????
&gt; 
&gt;&gt; ?? 2020??7??22????16:39??Leonard Xu <[email protected] 
<mailto:[email protected]&gt;&gt; ??????
&gt;&gt; 
&gt;&gt; HI,
&gt;&gt; ??????????????????????????????????????????????????????????????????
&gt;&gt; 
&gt;&gt; 
&gt;&gt;&gt; ?? 2020??7??22????16:27?????? <[email protected] 
<mailto:[email protected]&gt;&gt; ??????
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; HI
&gt;&gt;&gt; 
????????????????????????????????????????????????????????????????????????????????HELP????
&gt;&gt;&gt; ????
&gt;&gt;&gt; 
&gt;&gt;&gt; ------------------ ???????? ------------------
&gt;&gt;&gt; ??????: "????" <[email protected] 
<mailto:[email protected]&gt;&gt;;
&gt;&gt;&gt; ????????: 2020??7??22??(??????) ????3:17
&gt;&gt;&gt; ??????: "user-zh"<[email protected] 
<mailto:[email protected]&gt;&gt;;
&gt;&gt;&gt; ????: ????: Re: flinksql1.11????????????????
&gt;&gt;&gt; 
&gt;&gt;&gt; ??????
&gt;&gt;&gt; 
????????????????????????????1.11.0??????????TIDB??????????demo????????????????????
&gt;&gt;&gt; 
&gt;&gt;&gt; ??????????????????????????????????kafka
&gt;&gt;&gt;&nbsp; topic = 'tp1'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; for i&nbsp; in&nbsp; range(1,10000) :
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg = {}
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['id']= i
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['time1']= stime
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['type']=1
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print(msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; send_msg(topic, 
msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; time.sleep(1)
&gt;&gt;&gt; 
&gt;&gt;&gt; {'id': 1, 'time1': '20200722140624', 'type': 1}
&gt;&gt;&gt; {'id': 2, 'time1': '20200722140625', 'type': 1}
&gt;&gt;&gt; {'id': 3, 'time1': '20200722140626', 'type': 1}
&gt;&gt;&gt; {'id': 4, 'time1': '20200722140627', 'type': 1}
&gt;&gt;&gt; {'id': 5, 'time1': '20200722140628', 'type': 1}
&gt;&gt;&gt; {'id': 6, 'time1': '20200722140629', 'type': 1}
&gt;&gt;&gt; {'id': 7, 'time1': '20200722140631', 'type': 1}
&gt;&gt;&gt; {'id': 8, 'time1': '20200722140632', 'type': 1}
&gt;&gt;&gt; 
&gt;&gt;&gt; ????????????
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type
&gt;&gt;&gt; 2  err
&gt;&gt;&gt; 1  err
&gt;&gt;&gt; 
&gt;&gt;&gt; 
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; from pyflink.datastream import StreamExecutionEnvironment, 
TimeCharacteristic
&gt;&gt;&gt; from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
&gt;&gt;&gt; from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
&gt;&gt;&gt; from pyflink.table.window import Tumble
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; def from_kafka_to_kafka_demo():
&gt;&gt;&gt; 
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; # use blink table planner
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; env = 
StreamExecutionEnvironment.get_execution_environment()
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; env_settings = 
EnvironmentSettings.Builder().use_blink_planner().build()
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env = 
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
&gt;&gt;&gt; 
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; # register source and sink
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_rides_source(st_env)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_rides_sink(st_env)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_mysql_source(st_env)
&gt;&gt;&gt;&nbsp; 
&gt;&gt;&gt; 
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update("insert into 
flink_result select&nbsp; cast(t1.id <http://t1.id/&gt; as int) as 
id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 
t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/&gt; as varchar) 
")
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.execute("2-from_kafka_to_kafka")
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; 
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; def register_rides_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; create table source1(
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; id int,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; time1 varchar ,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type string
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) with (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'topic' = 'tp1',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'scan.startup.mode' = 'latest-offset',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'properties.bootstrap.servers' = 
'localhost:9092',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'format' = 'json'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt; 
&gt;&gt;&gt; def register_mysql_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE dim_mysql (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; id int,&nbsp; --
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; type varchar --
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' 
<&gt;,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = 'flink_test',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.cache.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.cache.ttl' = '1s',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """&nbsp;&nbsp;&nbsp; 
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt; 
&gt;&gt;&gt; def register_rides_sink(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; sink_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE flink_result (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; id int,&nbsp; 
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; type varchar,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; rtime bigint,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; primary key(id)&nbsp; NOT ENFORCED
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' 
<&gt;,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = 'flink_result',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.buffer-flush.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.buffer-flush.interval' = '2s',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(sink_ddl)
&gt;&gt;&gt; 
&gt;&gt;&gt; 
&gt;&gt;&gt; if __name__ == '__main__':
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; from_kafka_to_kafka_demo()
&gt;&gt;&gt; 
&gt;&gt;&gt; ??????
&gt;&gt;&gt; PyFlink??????
&gt;&gt;&gt; ????
&gt;&gt;&gt; 
&gt;&gt;&gt;&nbsp; 
&gt;&gt;&gt; ???????? Leonard Xu <mailto:[email protected]&gt;
&gt;&gt;&gt; ?????????? 2020-07-22 15:05
&gt;&gt;&gt; ???????? user-zh <mailto:[email protected]&gt;
&gt;&gt;&gt; ?????? Re: flinksql1.11????????????????
&gt;&gt;&gt; Hi,
&gt;&gt;&gt;&nbsp; 
&gt;&gt;&gt;&nbsp;&nbsp; ????????????????????????????????????????????????
&gt;&gt;&gt;&nbsp; 
&gt;&gt;&gt; ????
&gt;&gt;&gt; &gt; ?? 2020??7??22????14:50???????????????? <[email protected] 
<mailto:[email protected]&gt;&gt; ??????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ??????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
&gt;&gt;&gt; &gt; ??????????????????????????????????????????????????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 
------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt;&gt;&gt; &gt; 
??????:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 
"user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <[email protected] <mailto:[email protected]&gt;&amp;gt;;
&gt;&gt;&gt; &gt; ????????:&amp;nbsp;2020??7??22??(??????) ????2:42
&gt;&gt;&gt; &gt; ??????:&amp;nbsp;"user-zh"<[email protected] 
<mailto:[email protected]&gt;&amp;gt;;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ????:&amp;nbsp;Re: flinksql1.11????????????????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; Hello
&gt;&gt;&gt; &gt; 
??????????????????????????????????????????????????????????????????????????????????????????????????????????join??????????????????????????retract????????????????????????????????????????????????
&gt;&gt;&gt; &gt; ??look 
up??????????????????????????????retract??????????????????
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ????
&gt;&gt;&gt; &gt; Leonard Xu
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; &amp;gt; ?? 2020??7??22????14:[email protected] 
<http://qq.com/&gt; ??????
&gt;&gt;&gt; &gt; &amp;gt;
&gt;&gt;&gt; &gt; &amp;gt; ??????????????????????
&gt;&gt; 
&gt;

回复