Hello,

I'm trying to understand tumbling windows at the level of the python table api. 
   For this short example:

Input csv
Print output
2022-01-01 10:00:23.000000000, "data line 3"
2022-01-01 10:00:24.000000000, "data line 4"
2022-01-01 10:00:18.000000000, "data line 1"
2022-01-01 10:00:25.000000000, "data line 5"
2022-01-01 10:00:26.000000000, "data line 6"
2022-01-01 10:00:27.000000000, "data line 7"
2022-01-01 10:00:22.000000000, "data line 2"
2022-01-01 10:00:28.000000000, "data line 8"
2022-01-01 10:00:29.000000000, "data line 9"
2022-01-01 10:00:30.000000000, "data line 10"
+I[2022-01-01T10:00:23,  "data line 3"]
+I[2022-01-01T10:00:24,  "data line 4"]
+I[2022-01-01T10:00:18,  "data line 1"]
+I[2022-01-01T10:00:25,  "data line 5"]
+I[2022-01-01T10:00:26,  "data line 6"]
+I[2022-01-01T10:00:27,  "data line 7"]
+I[2022-01-01T10:00:28,  "data line 8"]
+I[2022-01-01T10:00:29,  "data line 9"]
+I[2022-01-01T10:00:22,  "data line 2"]
+I[2022-01-01T10:00:30,  "data line 10"]

Below, I'm trying to process this data in 5 second windows.  So I would at 
least expect not to see the bold line above, in print output.

Am I not really configuring tumbling windows in the source table?

from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

t_env.execute_sql("""
    create table source (
        ts TIMESTAMP(3),
        data STRING,
        WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("source.csv"))

t_env.execute_sql("""
    CREATE TABLE print (
        ts TIMESTAMP(3),
        data STRING
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()


Thank you,

Ivan

Reply via email to