Thanks. If you look at the code, I am defining/creating the table as: create_kafka_source_ddl = """ CREATE TABLE payment_msg( createTime VARCHAR, orderId BIGINT, payAmount DOUBLE, payPlatform INT, provinceId INT ) WITH ( 'connector' = 'kafka', 'topic' = 'payment_msg', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """
t_env.execute_sql(create_kafka_source_ddl) Is this enough to satisfy the sink table? Can you show me an example of the query output to the sink table? Thanks. Best, Amir On Wed, Feb 8, 2023 at 1:48 AM Leonard Xu <xbjt...@gmail.com> wrote: > > > 1) *First*: In *payment_msg_proccessing.py > > < > https://apache.googlesource.com/flink-playgrounds/+/HEAD/pyflink-walkthrough/payment_msg_proccessing.py > >* > > code, > > I want to run a simple query on Kafka stream (payment_msg table) without > > insertion data into the sink table (es_sink here) and do some data > > processing. (In my project, I won’t insert any data). So, is it possible > > to run the query (queries) on sources (streams) *without insertion data > > into other tables*? > No, you need to define at least one sink table to receive the query result. > > > > 2) *Second*: How can I iterate over results, and print data in the > output? > > For example, I wrote this simple query: *table_result = > > t_env.execute_sql(“select provinceId, payAmount from payment_msg”) *then > > after: > > > > with table_result.collect() as results: > > for result in results: > > print(result) > > The result here is the representation of the statement execution result, > NOT the query output. > You can define a print connector table[1] as your sink table, and insert > into the query output to the sink table to achieve your goal. > > Btw, you should send email to user mailing list for user QA, we discuss > community development in dev mailing list. > > Best, > Leonard