Hi all, I am reposting my stackoverflow question <https://stackoverflow.com/questions/69971277/flink-sql-tumble-aggregation-result-not-written-out-to-filesystem-locally> here.
So I would like to do a local test of my pyflink sql application. The sql query does a very simple job: read from source, window aggregation and write result to a sink. In the production, the source and sink will be kinesis stream. But since I need to do a local test, I am mocking out the source and sink with the local filesystem. The problem is the local job always runs successfully, but the results were never written out to the sink file. Curious what I am missing here. Note: - I am using Flink 1.11.0 - if I directly read data from the source table by select query and write to sink without windowing or grouping, it works just fine. That means the source and sink table is set up correctly. So it seems the problem is around the Tumbling and grouping for the local filesystem. - This code works fine with Kinesis source and sink. Here is the SQL code: ``` CREATE TABLE incoming_data ( requestId VARCHAR(4), groupId VARCHAR(32), userId VARCHAR(32), requestStartTime VARCHAR(32), processTime AS PROCTIME(), requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'), WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/test/json/file.json', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) CREATE TABLE user_latest_request ( groupId VARCHAR(32), userId VARCHAR(32), latestRequestTime TIMESTAMP ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/sink', 'format' = 'csv') INSERT INTO user_latest_request SELECT groupId, userId, MAX(requestTime) as latestRequestTime FROM incoming_data GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId; ``` Thanks!