Hi Roman, Thanks for your quick response!
Yes, it does seem to be the window closing problem. So if I change the tumble window on eventTime, which is column 'requestTime', it works fine. I guess the EOF of the test data file kicks in a watermark of Long.MAX_VALUE. But my application code needs to calculate the result based on the tumble window of processTime. Curious what is the typical setup for such a local test. ie. How do we inform the Flink to close the window if the input stream hits the end. Thanks, -Guoqin On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Guoqin, > > I think the problem might be related to watermarks and checkpointing: > - if the file is too small, the only watermark will be the one after > fully reading the file > - in exactly once mode, sink waits for a checkpoint completion before > committing the files > > Recently, there were some improvements in handling the last > checkpoint, so you might want to try Flink 1.14. > You might also try to configure checkpointing (e.g. decrease interval > [1]) and watermark generation [2] to diagnose the problem. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark > > > Regards, > Roman > > On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > > > > Hi all, > > > > I am reposting my stackoverflow question here. > > > > So I would like to do a local test of my pyflink sql application. The > sql query does a very simple job: read from source, window aggregation and > write result to a sink. > > > > In the production, the source and sink will be kinesis stream. But since > I need to do a local test, I am mocking out the source and sink with the > local filesystem. > > > > The problem is the local job always runs successfully, but the results > were never written out to the sink file. Curious what I am missing here. > > > > Note: > > > > I am using Flink 1.11.0 > > if I directly read data from the source table by select query and write > to sink without windowing or grouping, it works just fine. That means the > source and sink table is set up correctly. So it seems the problem is > around the Tumbling and grouping for the local filesystem. > > This code works fine with Kinesis source and sink. > > > > > > Here is the SQL code: > > > > ``` > > > > CREATE TABLE incoming_data ( > > requestId VARCHAR(4), > > groupId VARCHAR(32), > > userId VARCHAR(32), > > requestStartTime VARCHAR(32), > > processTime AS PROCTIME(), > > requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, > 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'), > > WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND > > ) WITH ( > > 'connector' = 'filesystem', > > 'path' = '/path/to/test/json/file.json', > > 'format' = 'json', > > 'json.timestamp-format.standard' = 'ISO-8601' > > ) > > > > CREATE TABLE user_latest_request ( > > groupId VARCHAR(32), > > userId VARCHAR(32), > > latestRequestTime TIMESTAMP > > ) WITH ( > > 'connector' = 'filesystem', > > 'path' = '/path/to/sink', > > 'format' = 'csv' > > ) > > > > INSERT INTO user_latest_request > > SELECT groupId, > > userId, > > MAX(requestTime) as latestRequestTime > > FROM incoming_data > > > > GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId; > > > > ``` > > > > > > Thanks! >