Hi, could you please share us the root cause? Seems the error message you posted hadn't contained the root cause. Maybe you can post the full error message .
Best regards, Yuxia 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> 收件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn> 抄送: "dev" <dev@flink.apache.org> 发送时间: 星期二, 2023年 2 月 07日 上午 10:39:25 主题: Re: Need help how to use Table API to join two Kafka streams Thank you for your reply. I tied it with a sample stream but it did not work. I am trying to get the results from my producer here with a very simple query. I want to see results in the console/output. This is my code: // Docker: docker-compose.yml version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:6.1.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:6.1.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 // Producer import json import sys from kafka import KafkaProducer KAFKA_SERVER = " [ http://127.0.0.1:9092/ | 127.0.0.1:9092 ] " def serializer (dictionary): try : message = json.dumps(dictionary) except Exception as e: sys.stderr.write( str (e) + ' \n ' ) message = str (dictionary) return message.encode( 'utf8' ) def create_sample_empad_json (raw_id): return { 'raw_id' : int (raw_id), 'raw_data' : str ( int (raw_id) + 7 )} def do_produce ( ): producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer =serializer) for raw_id in range ( 1 , 10 ): empad_json = data_helper.create_sample_empad_json(raw_id) producer.send(' EMPAD', empad_json) producer.flush() if __name__ == '__main__' : do_produce(XRD_PATH) // Flink from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings from pyflink.table.table_environment import StreamTableEnvironment def data_processing (): env = StreamExecutionEnvironment.get_execution_environment() env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) env.add_jars( "file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() t_env = StreamTableEnvironment.create( stream_execution_environment =env, environment_settings =settings) t1 = f""" CREATE TEMPORARY TABLE raw_table( raw_id INT, raw_data STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'EMPAD', 'properties.bootstrap.servers' = 'localhost:9092', ' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """ t_env.execute_sql(t1) table_result = t_env.execute_sql( " select raw_id, raw_data from raw_table " ) with table_result.collect() as results: for result in results: print (result) if __name__ == '__main__' : data_processing() getting this error message: pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'collect'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) ... 13 more Best, Amir On Sun, Feb 5, 2023 at 8:20 PM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > wrote: Hi, thanks for reaching me out. For your question, you don't need to cosume data in my cosumer class seperately and then insert them into those tables. The data will be consumed from what we implemented here. Best regards, Yuxia 发件人: "Amir Hossein Sharifzadeh" < [ mailto:amirsharifza...@gmail.com | amirsharifza...@gmail.com ] > 收件人: [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] 发送时间: 星期日, 2023年 2 月 05日 上午 6:07:02 主题: Re: Need help how to use Table API to join two Kafka streams Dear Yuxia, [ mailto:dev@flink.apache.org | dev@flink.apache.org ] Thank you again for your help. I am implementing code in Python. But I am still have some confusion about my application. As I mentioned before, I am sending two simple messages (JSON) on two different topics: This is my Kafka producer class: import json import sys from kafka import KafkaProducer def serializer (dictionary): try : message = json.dumps(dictionary) except Exception as e: sys.stderr.write( str (e) + ' \n ' ) message = str (dictionary) return message.encode( 'utf8' ) def create_sample_json (row_id): return { 'row_id' : int (row_id), 'my_data' : str ( int (row_id) + 7 )} def do_produce (topic_name): producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer =serializer) for row_id in range ( 1 , 10 ): my_data = data_helper.create_sample_json(row_id) producer.send(topic_name, my_data) producer.flush() if __name__ == '__main__' : do_produce('topic1') do_produce('topic2') ================================================================================== As you helped me, this is my Flink Consumer that I want to cosnume data from producer and run queries on them: from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings from pyflink.table.expressions import col from pyflink.table.table_environment import StreamTableEnvironment from org.varimat.model.com.varimat_constants import EMPAD_TOPIC KAFKA_SERVERS = 'localhost:9092' def log_processing (): env = StreamExecutionEnvironment.get_execution_environment() env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) env.add_jars( "file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() t_env = StreamTableEnvironment.create( stream_execution_environment =env, environment_settings =settings) t1 = f""" CREATE TEMPORARY TABLE table1( row_id INT, row_data STRING ) WITH ( 'connector' = 'kafka', 'topic' = ' topic1 ', 'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', ' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """ t2 = f""" CREATE TEMPORARY TABLE table2( row_id INT, row_data STRING ) WITH ( 'connector' = 'kafka', 'topic' = ' table2 ', 'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', ' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """ t_env.execute_sql(t1) t_env.execute_sql(t2) t3 = t_env.sql_query( " SELECT row_id, row_data as my_raw_data FROM table2 " ) // please tell me what should I do next: // Questions: // 1) Do I need to cosume data in my cosumer class seperately and then insert them into those tables or data will be consumed from what we implemented here (as I passed the name of the connector, toipc, bootstartap.servers, etc...)? // 2) If so: 2.1) how can I make join from those streams in Python? 2.2) How can I prevant the previous data as my rocedure will send thousands messages in each topic. I want to make sure that not to make duplicate queries. // 3) If not, what should I do? Thank you very much. Amir On Fri, Feb 3, 2023 at 5:45 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > wrote: BQ_BEGIN Hi, Amir. May look like using scala code: val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) WITH ('connector' = 'kafka', ...); val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) WITH ('connector' = 'kafka', ...); // you will need to rename the field to join, otherwise, it'll "org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn". val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") val result = t1.join(t3).where($"ssn" === $"ssn1"); Also, you can refer here for more detail[1]. [1] [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins ] Best regards, Yuxia ----- 原始邮件 ----- 发件人: "Amir Hossein Sharifzadeh" < [ mailto:amirsharifza...@gmail.com | amirsharifza...@gmail.com ] > 收件人: "dev" < [ mailto:dev@flink.apache.org | dev@flink.apache.org ] > 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 主题: Need help how to use Table API to join two Kafka streams Hello, I have a Kafka producer and a Kafka consumer that produces and consumes multiple data respectively. You can think of two data sets here. Both datasets have a similar structure but carry different data. I want to implement a Table API to join two Kafka streams while I consume them. For example, data1.ssn==data2.ssn Constraints: I don't want to change my producer or use FlinkKafkaProducer. Thank you very much. Best, Amir BQ_END