Hi, Amir. May look like using scala code: val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) WITH ('connector' = 'kafka', ...); val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) WITH ('connector' = 'kafka', ...);
// you will need to rename the field to join, otherwise, it'll "org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn". val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") val result = t1.join(t3).where($"ssn" === $"ssn1"); Also, you can refer here for more detail[1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins Best regards, Yuxia ----- 原始邮件 ----- 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> 收件人: "dev" <dev@flink.apache.org> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 主题: Need help how to use Table API to join two Kafka streams Hello, I have a Kafka producer and a Kafka consumer that produces and consumes multiple data respectively. You can think of two data sets here. Both datasets have a similar structure but carry different data. I want to implement a Table API to join two Kafka streams while I consume them. For example, data1.ssn==data2.ssn Constraints: I don't want to change my producer or use FlinkKafkaProducer. Thank you very much. Best, Amir