Hi all,

I am reposting my stackoverflow question
<https://stackoverflow.com/questions/69971277/flink-sql-tumble-aggregation-result-not-written-out-to-filesystem-locally>
 here.

So I would like to do a local test of my pyflink sql application. The sql
query does a very simple job: read from source, window aggregation and
write result to a sink.

In the production, the source and sink will be kinesis stream. But since I
need to do a local test, I am mocking out the source and sink with the
local filesystem.

The problem is the local job always runs successfully, but the results were
never written out to the sink file. Curious what I am missing here.

Note:

   - I am using Flink 1.11.0
   - if I directly read data from the source table by select query and
   write to sink without windowing or grouping, it works just fine. That means
   the source and sink table is set up correctly. So it seems the problem is
   around the Tumbling and grouping for the local filesystem.
   - This code works fine with Kinesis source and sink.


Here is the SQL code:

```

CREATE TABLE incoming_data (
        requestId VARCHAR(4),
        groupId VARCHAR(32),
        userId VARCHAR(32),
        requestStartTime VARCHAR(32),
        processTime AS PROCTIME(),
        requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime,
'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
        WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/path/to/test/json/file.json',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )
CREATE TABLE user_latest_request (
        groupId VARCHAR(32),
        userId VARCHAR(32),
        latestRequestTime TIMESTAMP
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/path/to/sink',
        'format' = 'csv')
INSERT INTO user_latest_request
    SELECT groupId,
           userId,
           MAX(requestTime) as latestRequestTime
    FROM incoming_data

    GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;

```


Thanks!

Reply via email to