Hello,

I have a simple source table (which is using kafka connector) that's
reading and storing data from specific kafka topic. I also have print
table:

> t_env.execute_sql("""
>         CREATE TABLE print (
>             window_start TIMESTAMP(3),
>             window_end TIMESTAMP(3),
>             how_any BIGINT
>         ) WITH (
>             'connector' = 'print'
>         )
>     """)


    insert_statement = """
>     INSERT INTO print (
>         SELECT window_start, window_end, COUNT(*) as how_any
>         FROM TABLE(
>             TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
>         GROUP BY window_start, window_end
>     )
>     """

Now in order to print from print table, I used:

> with t_env.execute_sql(insert_statement).collect() as results:
>         for res in results:
>                 print(res)


But for loop here is useless, as program never gets there. I am seeing
outputs from print table in my stdout, but program will run infinitely,
because (I guess) all the outputs that I see are generated in collect()
process.

Is there a way to interrupt collect() here? I wanted to stop the program
when for example res contains '2023' string, but I couldn't as
program never gets to for loop.
I tried with print() and wait() but same results (when grabbing from kafka
topic, program never gets to for loop).

Thank you,
Marjan

Reply via email to