[ 
https://issues.apache.org/jira/browse/FLINK-33616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yong yang closed FLINK-33616.
-----------------------------
    Resolution: Done

> multi lookup join error
> -----------------------
>
>                 Key: FLINK-33616
>                 URL: https://issues.apache.org/jira/browse/FLINK-33616
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.17.1
>            Reporter: yong yang
>            Priority: Major
>
> stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 = 
> cast(jdbc2.stringfield2 as int)
> show error: Temporal table join requires an equality condition on fields of 
> table [default_catalog.default_database.t22].
>  
> test code:
>  
> {code:java}
> //代码占位符
> package com.yy.flinkSqlJoin
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.table.api.Expressions.row
> import org.apache.flink.table.api.{DataTypes, Table}
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
> import org.apache.flink.types.Row
> import java.time.ZoneId;
> /**
> +I  插入
> -U  更新前
> +U  更新后
> -D  撤回消息  会往kafka发一条null 对应mysql删除一条消息.
>  * https://www.yuque.com/u430335/qea2i2/kw4qqu
>  * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可.
>  * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1)
>  * 测试使用:
>  *    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic 
> user_order
>  *    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic 
> user_payment
>  *    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out
>  * kafka数据:
>  *    订单:
>  *      {"order_id":100,"ts":1665367200000}  -- step2
>  *      {"order_id":101,"ts":1665367200000}  -- step6
>  *    支付(mysql):
>  *      use db_yy;
>         create table user_pay (
>                         order_id bigint
>                       ,paymoney bigint
>                       ,primary key (order_id)
>                       )ENGINE=InnoDB DEFAULT CHARSET=utf8;
>         insert into user_pay values(100,111); -- step1
>         update user_pay set paymoney=222 where order_id=100; -- step3
>         insert into user_pay values(101,33);  -- step4
>         update user_pay set paymoney=44 where order_id=101; -- step5
>  *    代码回撤流输出(只有insert):
>  *        8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后.  注意: 
> lookup join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果
>  *        (true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后.
>  *    kafka topic输出:
>  *        {"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111}
>  *        {"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44}
>  *
>  *    逻辑:
>  *        lookup join 也分为 inner join; left join; full join.
>  *        lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行).
>  *
>  */
> object LookUpJoinJDBCDemo {
>   def main(args: Array[String]): Unit = {
> //    Class.forName("com.mysql.cj.jdbc.Driver")
>     // flink1.13 流处理环境初始化
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env)
>     // 指定国内时区
>     tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
>     // 订单表
>     /*
>     kafka参数:
>      d_timestamp 从kafka元数据或者原始数据中获取
>         d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
>      参数:json.fail-on-missing-field  
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8
>         当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
>      参数: json.ignore-parse-errors
>         当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
> false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
>      注意: 下面 with中的配置是kafka输入表的配置
>      */
> //    val UserOrderTableSql =
> //      """
> //        |create table user_order (
> //        | order_id bigint,
> //        | ts bigint,
> //        | d_timestamp as TO_TIMESTAMP_LTZ(ts,3),
> //        | proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要
> //        |)WITH(
> //        |    'connector' = 'kafka',
> //        |      'topic' = 'user_order',
> //        |      'properties.bootstrap.servers' = 'localhost:9092',
> //        |      'properties.group.id' = 'g1',
> //        |      'scan.startup.mode' = 'latest-offset',
> //        |      'format' = 'json',
> //        |      'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错.
> //        |      'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据
> //        |)
> //        |""".stripMargin
> //    tEnv.executeSql(UserOrderTableSql)
>     // scala int 到 java Integer的隐式转换
>     /*
>      case class C1(age:Int,name:String,time:Long)
>      flink stream 事件时间
>      */
>     val table = tEnv.fromValues(
>       DataTypes.ROW(
>         DataTypes.FIELD("order_id", DataTypes.STRING())
>         , DataTypes.FIELD("ts", DataTypes.INT())
>         , DataTypes.FIELD("d_timestamp", DataTypes.TIMESTAMP_LTZ(3))
>       ),
>       row("100", Integer.valueOf(1), java.lang.Long.valueOf(1691722303347L))
>       , row("100", Integer.valueOf(2), java.lang.Long.valueOf(1691732303347L))
>       , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691742303347L))
>       , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691752303347L))
>     )
>     tEnv.createTemporaryView("user_order_pre1", table)
>     tEnv.executeSql(
>       """
>         |create view user_order as select *,proctime() as proctime from 
> user_order_pre1
>         |""".stripMargin)
>     tEnv.from("user_order").execute().print()
>     // 支付表 时态表 维度表 必须有主键定义. kafka connector不支持主键. 维度表是有界表. join取最新版本. 
> 所以这里两种kafka connector都有问题. 这里用mysql测试.
>     val paymentFlow =
>       """
>         |create table user_pay (
>         | order_id string,
>         | paymoney bigint,
>         | PRIMARY KEY(order_id) NOT ENFORCED
>         |)WITH(
>         |    'connector' = 'jdbc',
>         |    'url' = 
> 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
>         |    'table-name' = 'user_pay',
>         |    'username' = 'root',
>         |    'password' = '123123123'
>         |)
>         |""".stripMargin
>     tEnv.executeSql(paymentFlow)
>     tEnv.executeSql(
>       """
>         |create table t22
>         |(
>         |    id  string,
>         |    age int,
>         |    bi bigint
>         |)with(
>         |   'connector' = 'jdbc',
>         |    'url' = 
> 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
>         |    'table-name' = 't',
>         |    'username' = 'root',
>         |    'password' = '123123123'
>         |)
>         |""".stripMargin)
>     tEnv.executeSql(
>       """
>         |create view t33 as select *,cast(age as string) as age1 from t22
>         |""".stripMargin)
>     // 结果表
>     /*
>     注意: 下面 with中的配置是kafka输出表的配置
>      */
>     val resTableSQL =
>       """
>         |create table user_res (
>         | order_id bigint,
>         | d_timestamp TIMESTAMP_LTZ(3),
>         | paymoney bigint
>         |)WITH(
>         |    'connector' = 'kafka',
>         |      'topic' = 'out',
>         |      'properties.bootstrap.servers' = 'localhost:9092',
>         |      'format' = 'json',
>         |      'sink.partitioner' = 'default' -- 默认分区器
>         |)
>         |""".stripMargin
>     tEnv.executeSql(resTableSQL)
>     // 关联表并输出 注意: r是维度表关联处理时间后的表别名 inner join 事实表流来了去维度表匹配,匹配到才发往下游,匹配不到则丢掉
>     val tb1: Table = tEnv.sqlQuery(
>       """
>         |select
>         |   l.order_id,
>         |   l.d_timestamp,
>         |   r.paymoney,
>         |   r2.age
>         |from user_order as l
>         |join
>         |   user_pay FOR SYSTEM_TIME AS OF l.proctime AS r
>         |on l.order_id = r.order_id
>         |left join
>         |   t22 FOR SYSTEM_TIME AS OF l.proctime AS r2
>         |-- on r.order_id = r2.id
>         |on r.order_id = cast(r2.age as string) -- error: Temporal table join 
> requires an equality condition on fields of table 
> [default_catalog.default_database.t22].
>         |""".stripMargin)
>     // 特别注意: 这里维表join on的条件(r1 r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: Temporal 
> table join requires an equality condition on fields of table
>     // 特别注意: 这里维表join on的条件(l r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: implicit type 
> conversion between VARCHAR(2147483647) and INTEGER is not supported on join's 
> condition now
>     tEnv.toDataStream(tb1).print()
>     // lookup join 之 left join;  事实表流 来数据去外部维度表匹配 无论是否匹配到 都会发往下游.
> //    val tb1: Table = tEnv.sqlQuery(
> //      """
> //        |select
> //        |   l.order_id,
> //        |   l.d_timestamp,
> //        |   r.paymoney
> //        |from user_order as l left join user_pay FOR SYSTEM_TIME AS OF 
> l.proctime AS r
> //        |on l.order_id = r.order_id
> //        |""".stripMargin)
>     /*
>     报错: Unknown join type LEFT
>     lookup join 不支持right join. 因为事实表是驱动表,和right join的逻辑不符合.
>      */
> //    val tb1: Table = tEnv.sqlQuery(
> //      """
> //        |select
> //        |   l.order_id,
> //        |   l.d_timestamp,
> //        |   r.paymoney
> //        |from user_order as l right join user_pay FOR SYSTEM_TIME AS OF 
> l.proctime AS r
> //        |on l.order_id = r.order_id
> //        |""".stripMargin)
>         /*
>          报错: Unknown join type FULL
>         look up join 不支持 full outer join. 因为左侧是驱动表 所以只支持 inner join(flatmap + 
> _.filter.collect) 和 left join(flatmap + collect)
>          */
> //        val tb1: Table = tEnv.sqlQuery(
> //          """
> //            |select
> //            |   l.order_id,
> //            |   l.d_timestamp,
> //            |   r.paymoney
> //            |from user_order as l full join user_pay FOR SYSTEM_TIME AS OF 
> l.proctime AS r
> //            |on l.order_id = r.order_id
> //            |""".stripMargin)
> //    tEnv.toRetractStream(tb1, classOf[Row]).print()
>     // 注意 这里传入Table类型的变量名即可
> //    tEnv.executeSql("insert into user_res select * from " + tb1)
>     env.execute("job1")
>   }
> }
>  {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to