Hi Guoqin, I was able to reproduce the problem locally. I can see that at the time of window firing the services are already closed because of the emitted MAX_WATERMARK.
Previously, there were some discussions around waiting for all timers to complete [1], but AFAIK there was not much demand to implement it. Probably Piotr or Arvid know more. Please feel free to bump the ticket. [1] https://issues.apache.org/jira/browse/FLINK-18647 Regards, Roman On Wed, Nov 17, 2021 at 7:36 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote: > > Hi Roman, > > Thanks for the detailed explanation. > > I did try 1.13 and 1.14, but it still didn't work. > > I explicitly enabled the checkpoint with: `env.enable_checkpointing(10)`. Any > other configurations I need to set? > > Thanks, > -Guoqin > > On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi Guoqin, >> >> Thanks for the clarification. >> >> Processing time windows actually don't need watermarks: they fire when >> window end time comes. >> But the job will likely finish earlier because of the bounded input. >> >> Handling of this case was improved in 1.14 as part of FLIP-147, as >> well as in previous versions. >> So I'd suggest trying the most recent Flink version. >> >> Regards, >> Roman >> >> On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote: >> > >> > Hi Roman, >> > >> > Thanks for your quick response! >> > >> > Yes, it does seem to be the window closing problem. >> > >> > So if I change the tumble window on eventTime, which is column >> > 'requestTime', it works fine. I guess the EOF of the test data file kicks >> > in a watermark of Long.MAX_VALUE. >> > >> > But my application code needs to calculate the result based on the tumble >> > window of processTime. Curious what is the typical setup for such a local >> > test. ie. How do we inform the Flink to close the window if the input >> > stream hits the end. >> > >> > Thanks, >> > -Guoqin >> > >> > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> >> > wrote: >> >> >> >> Hi Guoqin, >> >> >> >> I think the problem might be related to watermarks and checkpointing: >> >> - if the file is too small, the only watermark will be the one after >> >> fully reading the file >> >> - in exactly once mode, sink waits for a checkpoint completion before >> >> committing the files >> >> >> >> Recently, there were some improvements in handling the last >> >> checkpoint, so you might want to try Flink 1.14. >> >> You might also try to configure checkpointing (e.g. decrease interval >> >> [1]) and watermark generation [2] to diagnose the problem. >> >> >> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval >> >> [2] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark >> >> >> >> >> >> Regards, >> >> Roman >> >> >> >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> >> >> wrote: >> >> > >> >> > Hi all, >> >> > >> >> > I am reposting my stackoverflow question here. >> >> > >> >> > So I would like to do a local test of my pyflink sql application. The >> >> > sql query does a very simple job: read from source, window aggregation >> >> > and write result to a sink. >> >> > >> >> > In the production, the source and sink will be kinesis stream. But >> >> > since I need to do a local test, I am mocking out the source and sink >> >> > with the local filesystem. >> >> > >> >> > The problem is the local job always runs successfully, but the results >> >> > were never written out to the sink file. Curious what I am missing here. >> >> > >> >> > Note: >> >> > >> >> > I am using Flink 1.11.0 >> >> > if I directly read data from the source table by select query and write >> >> > to sink without windowing or grouping, it works just fine. That means >> >> > the source and sink table is set up correctly. So it seems the problem >> >> > is around the Tumbling and grouping for the local filesystem. >> >> > This code works fine with Kinesis source and sink. >> >> > >> >> > >> >> > Here is the SQL code: >> >> > >> >> > ``` >> >> > >> >> > CREATE TABLE incoming_data ( >> >> > requestId VARCHAR(4), >> >> > groupId VARCHAR(32), >> >> > userId VARCHAR(32), >> >> > requestStartTime VARCHAR(32), >> >> > processTime AS PROCTIME(), >> >> > requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, >> >> > 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'), >> >> > WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND >> >> > ) WITH ( >> >> > 'connector' = 'filesystem', >> >> > 'path' = '/path/to/test/json/file.json', >> >> > 'format' = 'json', >> >> > 'json.timestamp-format.standard' = 'ISO-8601' >> >> > ) >> >> > >> >> > CREATE TABLE user_latest_request ( >> >> > groupId VARCHAR(32), >> >> > userId VARCHAR(32), >> >> > latestRequestTime TIMESTAMP >> >> > ) WITH ( >> >> > 'connector' = 'filesystem', >> >> > 'path' = '/path/to/sink', >> >> > 'format' = 'csv' >> >> > ) >> >> > >> >> > INSERT INTO user_latest_request >> >> > SELECT groupId, >> >> > userId, >> >> > MAX(requestTime) as latestRequestTime >> >> > FROM incoming_data >> >> > >> >> > GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId; >> >> > >> >> > ``` >> >> > >> >> > >> >> > Thanks!