Hi, could you please share us the root cause? 
Seems the error message you posted hadn't contained the root cause. Maybe you 
can post the full error message . 

Best regards, 
Yuxia 


发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com> 
收件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn> 
抄送: "dev" <dev@flink.apache.org> 
发送时间: 星期二, 2023年 2 月 07日 上午 10:39:25 
主题: Re: Need help how to use Table API to join two Kafka streams 

Thank you for your reply. I tied it with a sample stream but it did not work. I 
am trying to get the results from my producer here with a very simple query. I 
want to see results in the console/output. 
This is my code: 
// Docker: docker-compose.yml 
version: '2' 
services: 
zookeeper: 
image: confluentinc/cp-zookeeper:6.1.1 
hostname: zookeeper 
container_name: zookeeper 
ports: 
- "2181:2181" 
environment: 
ZOOKEEPER_CLIENT_PORT: 2181 
ZOOKEEPER_TICK_TIME: 2000 

broker: 
image: confluentinc/cp-kafka:6.1.1 
hostname: broker 
container_name: broker 
depends_on: 
- zookeeper 
ports: 
- "29092:29092" 
- "9092:9092" 
- "9101:9101" 
environment: 
KAFKA_BROKER_ID: 1 
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 
KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 


// Producer 
import json 
import sys 

from kafka import KafkaProducer 

KAFKA_SERVER = " [ http://127.0.0.1:9092/ | 127.0.0.1:9092 ] " 
def serializer (dictionary): 
try : 
message = json.dumps(dictionary) 
except Exception as e: 
sys.stderr.write( str (e) + ' \n ' ) 
message = str (dictionary) 
return message.encode( 'utf8' ) 

def create_sample_empad_json (raw_id): 
return { 'raw_id' : int (raw_id), 'raw_data' : str ( int (raw_id) + 7 )} 
def do_produce ( ): 
producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer 
=serializer) 
for raw_id in range ( 1 , 10 ): 
empad_json = data_helper.create_sample_empad_json(raw_id) 
producer.send(' EMPAD', empad_json) 
producer.flush() 

if __name__ == '__main__' : 
do_produce(XRD_PATH) 

// Flink 
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment 
from pyflink.table import EnvironmentSettings 
from pyflink.table.table_environment import StreamTableEnvironment 

def data_processing (): 
env = StreamExecutionEnvironment.get_execution_environment() 
env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) 
env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) 
env.add_jars( 
"file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) 

settings = EnvironmentSettings.new_instance() \ 
.in_streaming_mode() \ 
.build() 

t_env = StreamTableEnvironment.create( stream_execution_environment =env, 
environment_settings =settings) 

t1 = f""" 
CREATE TEMPORARY TABLE raw_table( 
raw_id INT, 
raw_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = 'EMPAD', 
'properties.bootstrap.servers' = 'localhost:9092', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t_env.execute_sql(t1) 

table_result = t_env.execute_sql( " select raw_id, raw_data from raw_table " ) 

with table_result.collect() as results: 
for result in results: 
print (result) 
if __name__ == '__main__' : 
data_processing() 


getting this error message: 
pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Failed to execute sql 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:498) 
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 
at java.lang.Thread.run(Thread.java:750) 
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'collect'. 
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
 
at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
 
at 
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
 
... 13 more 

Best, 
Amir 

On Sun, Feb 5, 2023 at 8:20 PM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | 
luoyu...@alumni.sjtu.edu.cn ] > wrote: 



Hi, thanks for reaching me out. 
For your question, you don't need to cosume data in my cosumer class seperately 
and then insert them into those tables. The data will be consumed from what we 
implemented here. 

Best regards, 
Yuxia 


发件人: "Amir Hossein Sharifzadeh" < [ mailto:amirsharifza...@gmail.com | 
amirsharifza...@gmail.com ] > 
收件人: [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] 
发送时间: 星期日, 2023年 2 月 05日 上午 6:07:02 
主题: Re: Need help how to use Table API to join two Kafka streams 

Dear Yuxia, [ mailto:dev@flink.apache.org | dev@flink.apache.org ] 
Thank you again for your help. I am implementing code in Python. But I am still 
have some confusion about my application. 
As I mentioned before, I am sending two simple messages (JSON) on two different 
topics: 
This is my Kafka producer class: 
import json 
import sys 
from kafka import KafkaProducer 
def serializer (dictionary): 
try : 
message = json.dumps(dictionary) 
except Exception as e: 
sys.stderr.write( str (e) + ' \n ' ) 
message = str (dictionary) 
return message.encode( 'utf8' ) 
def create_sample_json (row_id): 
return { 'row_id' : int (row_id), 'my_data' : str ( int (row_id) + 7 )} 
def do_produce (topic_name): 
producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer 
=serializer) 
for row_id in range ( 1 , 10 ): 
my_data = data_helper.create_sample_json(row_id) 
producer.send(topic_name, my_data) 
producer.flush() 
if __name__ == '__main__' : 
do_produce('topic1') 
do_produce('topic2') 
==================================================================================
 
As you helped me, this is my Flink Consumer that I want to cosnume data from 
producer and run queries on them: 
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment 
from pyflink.table import EnvironmentSettings 
from pyflink.table.expressions import col 
from pyflink.table.table_environment import StreamTableEnvironment 

from org.varimat.model.com.varimat_constants import EMPAD_TOPIC 

KAFKA_SERVERS = 'localhost:9092' 

def log_processing (): 
env = StreamExecutionEnvironment.get_execution_environment() 
env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) 
env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) 
env.add_jars( 
"file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) 

settings = EnvironmentSettings.new_instance() \ 
.in_streaming_mode() \ 
.build() 

t_env = StreamTableEnvironment.create( stream_execution_environment =env, 
environment_settings =settings) 

t1 = f""" 
CREATE TEMPORARY TABLE table1( 
row_id INT, 
row_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = ' topic1 ', 
'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t2 = f""" 
CREATE TEMPORARY TABLE table2( 
row_id INT, 
row_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = ' table2 ', 
'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t_env.execute_sql(t1) 
t_env.execute_sql(t2) 

t3 = t_env.sql_query( " SELECT row_id, row_data as my_raw_data FROM table2 " ) 
// please tell me what should I do next: 
// Questions: 
// 1) Do I need to cosume data in my cosumer class seperately and then insert 
them into those tables or data will be consumed from 
what we implemented here (as I passed the name of the connector, toipc, 
bootstartap.servers, etc...)? 
// 2) If so: 
2.1) how can I make join from those streams in Python? 
2.2) How can I prevant the previous data as my rocedure will send thousands 
messages in each topic. I want to make sure that 
not to make duplicate queries. 
// 3) If not, what should I do? 

Thank you very much. 
Amir 






On Fri, Feb 3, 2023 at 5:45 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | 
luoyu...@alumni.sjtu.edu.cn ] > wrote: 

BQ_BEGIN
Hi, Amir. 
May look like using scala code: 

val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) 
WITH ('connector' = 'kafka', ...); 
val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) 
WITH ('connector' = 'kafka', ...); 

// you will need to rename the field to join, otherwise, it'll 
"org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn". 
val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") 
val result = t1.join(t3).where($"ssn" === $"ssn1"); 

Also, you can refer here for more detail[1]. 
[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
 ] 

Best regards, 
Yuxia 

----- 原始邮件 ----- 
发件人: "Amir Hossein Sharifzadeh" < [ mailto:amirsharifza...@gmail.com | 
amirsharifza...@gmail.com ] > 
收件人: "dev" < [ mailto:dev@flink.apache.org | dev@flink.apache.org ] > 
发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 
主题: Need help how to use Table API to join two Kafka streams 

Hello, 

I have a Kafka producer and a Kafka consumer that produces and consumes 
multiple data respectively. You can think of two data sets here. Both 
datasets have a similar structure but carry different data. 

I want to implement a Table API to join two Kafka streams while I 
consume them. For example, data1.ssn==data2.ssn 

Constraints: 
I don't want to change my producer or use FlinkKafkaProducer. 

Thank you very much. 

Best, 
Amir 




BQ_END


Reply via email to