Hi Roman, Thanks for the update and testing locally. This is very informative.
-Guoqin On Mon, Nov 22, 2021 at 10:55 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Guoqin, > > I was able to reproduce the problem locally. I can see that at the > time of window firing the services are already closed because of the > emitted MAX_WATERMARK. > > Previously, there were some discussions around waiting for all timers > to complete [1], but AFAIK there was not much demand to implement it. > Probably Piotr or Arvid know more. > > Please feel free to bump the ticket. > > [1] > https://issues.apache.org/jira/browse/FLINK-18647 > > Regards, > Roman > > On Wed, Nov 17, 2021 at 7:36 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > > > > Hi Roman, > > > > Thanks for the detailed explanation. > > > > I did try 1.13 and 1.14, but it still didn't work. > > > > I explicitly enabled the checkpoint with: > `env.enable_checkpointing(10)`. Any other configurations I need to set? > > > > Thanks, > > -Guoqin > > > > On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> Hi Guoqin, > >> > >> Thanks for the clarification. > >> > >> Processing time windows actually don't need watermarks: they fire when > >> window end time comes. > >> But the job will likely finish earlier because of the bounded input. > >> > >> Handling of this case was improved in 1.14 as part of FLIP-147, as > >> well as in previous versions. > >> So I'd suggest trying the most recent Flink version. > >> > >> Regards, > >> Roman > >> > >> On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > >> > > >> > Hi Roman, > >> > > >> > Thanks for your quick response! > >> > > >> > Yes, it does seem to be the window closing problem. > >> > > >> > So if I change the tumble window on eventTime, which is column > 'requestTime', it works fine. I guess the EOF of the test data file kicks > in a watermark of Long.MAX_VALUE. > >> > > >> > But my application code needs to calculate the result based on the > tumble window of processTime. Curious what is the typical setup for such a > local test. ie. How do we inform the Flink to close the window if the input > stream hits the end. > >> > > >> > Thanks, > >> > -Guoqin > >> > > >> > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> > wrote: > >> >> > >> >> Hi Guoqin, > >> >> > >> >> I think the problem might be related to watermarks and checkpointing: > >> >> - if the file is too small, the only watermark will be the one after > >> >> fully reading the file > >> >> - in exactly once mode, sink waits for a checkpoint completion before > >> >> committing the files > >> >> > >> >> Recently, there were some improvements in handling the last > >> >> checkpoint, so you might want to try Flink 1.14. > >> >> You might also try to configure checkpointing (e.g. decrease interval > >> >> [1]) and watermark generation [2] to diagnose the problem. > >> >> > >> >> [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval > >> >> [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark > >> >> > >> >> > >> >> Regards, > >> >> Roman > >> >> > >> >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> > wrote: > >> >> > > >> >> > Hi all, > >> >> > > >> >> > I am reposting my stackoverflow question here. > >> >> > > >> >> > So I would like to do a local test of my pyflink sql application. > The sql query does a very simple job: read from source, window aggregation > and write result to a sink. > >> >> > > >> >> > In the production, the source and sink will be kinesis stream. But > since I need to do a local test, I am mocking out the source and sink with > the local filesystem. > >> >> > > >> >> > The problem is the local job always runs successfully, but the > results were never written out to the sink file. Curious what I am missing > here. > >> >> > > >> >> > Note: > >> >> > > >> >> > I am using Flink 1.11.0 > >> >> > if I directly read data from the source table by select query and > write to sink without windowing or grouping, it works just fine. That means > the source and sink table is set up correctly. So it seems the problem is > around the Tumbling and grouping for the local filesystem. > >> >> > This code works fine with Kinesis source and sink. > >> >> > > >> >> > > >> >> > Here is the SQL code: > >> >> > > >> >> > ``` > >> >> > > >> >> > CREATE TABLE incoming_data ( > >> >> > requestId VARCHAR(4), > >> >> > groupId VARCHAR(32), > >> >> > userId VARCHAR(32), > >> >> > requestStartTime VARCHAR(32), > >> >> > processTime AS PROCTIME(), > >> >> > requestTime AS > TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', ' '), 0, 23), > 'yyyy-MM-dd HH:mm:ss.SSS'), > >> >> > WATERMARK FOR requestTime AS requestTime - INTERVAL '5' > SECOND > >> >> > ) WITH ( > >> >> > 'connector' = 'filesystem', > >> >> > 'path' = '/path/to/test/json/file.json', > >> >> > 'format' = 'json', > >> >> > 'json.timestamp-format.standard' = 'ISO-8601' > >> >> > ) > >> >> > > >> >> > CREATE TABLE user_latest_request ( > >> >> > groupId VARCHAR(32), > >> >> > userId VARCHAR(32), > >> >> > latestRequestTime TIMESTAMP > >> >> > ) WITH ( > >> >> > 'connector' = 'filesystem', > >> >> > 'path' = '/path/to/sink', > >> >> > 'format' = 'csv' > >> >> > ) > >> >> > > >> >> > INSERT INTO user_latest_request > >> >> > SELECT groupId, > >> >> > userId, > >> >> > MAX(requestTime) as latestRequestTime > >> >> > FROM incoming_data > >> >> > > >> >> > GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, > userId; > >> >> > > >> >> > ``` > >> >> > > >> >> > > >> >> > Thanks! >