Hi Roman, Thanks for the detailed explanation.
I did try 1.13 and 1.14, but it still didn't work. I explicitly enabled the checkpoint with: `env.enable_checkpointing(10)`. Any other configurations I need to set? Thanks, -Guoqin On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Guoqin, > > Thanks for the clarification. > > Processing time windows actually don't need watermarks: they fire when > window end time comes. > But the job will likely finish earlier because of the bounded input. > > Handling of this case was improved in 1.14 as part of FLIP-147, as > well as in previous versions. > So I'd suggest trying the most recent Flink version. > > Regards, > Roman > > On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > > > > Hi Roman, > > > > Thanks for your quick response! > > > > Yes, it does seem to be the window closing problem. > > > > So if I change the tumble window on eventTime, which is column > 'requestTime', it works fine. I guess the EOF of the test data file kicks > in a watermark of Long.MAX_VALUE. > > > > But my application code needs to calculate the result based on the > tumble window of processTime. Curious what is the typical setup for such a > local test. ie. How do we inform the Flink to close the window if the input > stream hits the end. > > > > Thanks, > > -Guoqin > > > > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> Hi Guoqin, > >> > >> I think the problem might be related to watermarks and checkpointing: > >> - if the file is too small, the only watermark will be the one after > >> fully reading the file > >> - in exactly once mode, sink waits for a checkpoint completion before > >> committing the files > >> > >> Recently, there were some improvements in handling the last > >> checkpoint, so you might want to try Flink 1.14. > >> You might also try to configure checkpointing (e.g. decrease interval > >> [1]) and watermark generation [2] to diagnose the problem. > >> > >> [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval > >> [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark > >> > >> > >> Regards, > >> Roman > >> > >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > >> > > >> > Hi all, > >> > > >> > I am reposting my stackoverflow question here. > >> > > >> > So I would like to do a local test of my pyflink sql application. The > sql query does a very simple job: read from source, window aggregation and > write result to a sink. > >> > > >> > In the production, the source and sink will be kinesis stream. But > since I need to do a local test, I am mocking out the source and sink with > the local filesystem. > >> > > >> > The problem is the local job always runs successfully, but the > results were never written out to the sink file. Curious what I am missing > here. > >> > > >> > Note: > >> > > >> > I am using Flink 1.11.0 > >> > if I directly read data from the source table by select query and > write to sink without windowing or grouping, it works just fine. That means > the source and sink table is set up correctly. So it seems the problem is > around the Tumbling and grouping for the local filesystem. > >> > This code works fine with Kinesis source and sink. > >> > > >> > > >> > Here is the SQL code: > >> > > >> > ``` > >> > > >> > CREATE TABLE incoming_data ( > >> > requestId VARCHAR(4), > >> > groupId VARCHAR(32), > >> > userId VARCHAR(32), > >> > requestStartTime VARCHAR(32), > >> > processTime AS PROCTIME(), > >> > requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, > 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'), > >> > WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND > >> > ) WITH ( > >> > 'connector' = 'filesystem', > >> > 'path' = '/path/to/test/json/file.json', > >> > 'format' = 'json', > >> > 'json.timestamp-format.standard' = 'ISO-8601' > >> > ) > >> > > >> > CREATE TABLE user_latest_request ( > >> > groupId VARCHAR(32), > >> > userId VARCHAR(32), > >> > latestRequestTime TIMESTAMP > >> > ) WITH ( > >> > 'connector' = 'filesystem', > >> > 'path' = '/path/to/sink', > >> > 'format' = 'csv' > >> > ) > >> > > >> > INSERT INTO user_latest_request > >> > SELECT groupId, > >> > userId, > >> > MAX(requestTime) as latestRequestTime > >> > FROM incoming_data > >> > > >> > GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, > userId; > >> > > >> > ``` > >> > > >> > > >> > Thanks! >