Hi, Amir.
May look like using scala code:

val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) 
WITH ('connector' = 'kafka', ...);
val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) 
WITH ('connector' = 'kafka', ...);

// you will need to rename the field to join, otherwise, it'll 
"org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn".
val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
val result = t1.join(t3).where($"ssn" === $"ssn1");

Also, you can refer here for more detail[1].
[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
收件人: "dev" <dev@flink.apache.org>
发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
主题: Need help how to use Table API to join two Kafka streams

Hello,

I have a Kafka producer and a Kafka consumer that produces and consumes
multiple data respectively. You can think of two data sets here. Both
datasets have a similar structure but carry different data.

I want to implement a Table API to join two Kafka streams while I
consume them. For example, data1.ssn==data2.ssn

Constraints:
I don't want to change my producer or use FlinkKafkaProducer.

Thank you very much.

Best,
Amir

Reply via email to