Hi Guoqin,

Thanks for the clarification.

Processing time windows actually don't need watermarks: they fire when
window end time comes.
But the job will likely finish earlier because of the bounded input.

Handling of this case was improved in 1.14 as part of FLIP-147, as
well as in previous versions.
So I'd suggest trying the most recent Flink version.

Regards,
Roman

On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote:
>
> Hi Roman,
>
> Thanks for your quick response!
>
> Yes, it does seem to be the window closing problem.
>
> So if I change the tumble window on eventTime, which is column 'requestTime', 
> it works fine. I guess the EOF of the test data file kicks in a watermark of 
> Long.MAX_VALUE.
>
> But my application code needs to calculate the result based on the tumble 
> window of processTime. Curious what is the typical setup for such a local 
> test. ie. How do we inform the Flink to close the window if the input stream 
> hits the end.
>
> Thanks,
> -Guoqin
>
> On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi Guoqin,
>>
>> I think the problem might be related to watermarks and checkpointing:
>> - if the file is too small, the only watermark will be the one after
>> fully reading the file
>> - in exactly once mode, sink waits for a checkpoint completion before
>> committing the files
>>
>> Recently, there were some improvements in handling the last
>> checkpoint, so you might want to try Flink 1.14.
>> You might also try to configure checkpointing (e.g. decrease interval
>> [1]) and watermark generation [2] to diagnose the problem.
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval
>> [2] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
>>
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > I am reposting my stackoverflow question here.
>> >
>> > So I would like to do a local test of my pyflink sql application. The sql 
>> > query does a very simple job: read from source, window aggregation and 
>> > write result to a sink.
>> >
>> > In the production, the source and sink will be kinesis stream. But since I 
>> > need to do a local test, I am mocking out the source and sink with the 
>> > local filesystem.
>> >
>> > The problem is the local job always runs successfully, but the results 
>> > were never written out to the sink file. Curious what I am missing here.
>> >
>> > Note:
>> >
>> > I am using Flink 1.11.0
>> > if I directly read data from the source table by select query and write to 
>> > sink without windowing or grouping, it works just fine. That means the 
>> > source and sink table is set up correctly. So it seems the problem is 
>> > around the Tumbling and grouping for the local filesystem.
>> > This code works fine with Kinesis source and sink.
>> >
>> >
>> > Here is the SQL code:
>> >
>> > ```
>> >
>> > CREATE TABLE incoming_data (
>> >         requestId VARCHAR(4),
>> >         groupId VARCHAR(32),
>> >         userId VARCHAR(32),
>> >         requestStartTime VARCHAR(32),
>> >         processTime AS PROCTIME(),
>> >         requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', 
>> > ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
>> >         WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
>> >     ) WITH (
>> >         'connector' = 'filesystem',
>> >         'path' = '/path/to/test/json/file.json',
>> >         'format' = 'json',
>> >         'json.timestamp-format.standard' = 'ISO-8601'
>> >     )
>> >
>> > CREATE TABLE user_latest_request (
>> >         groupId VARCHAR(32),
>> >         userId VARCHAR(32),
>> >         latestRequestTime TIMESTAMP
>> >     ) WITH (
>> >         'connector' = 'filesystem',
>> >         'path' = '/path/to/sink',
>> >         'format' = 'csv'
>> > )
>> >
>> > INSERT INTO user_latest_request
>> >     SELECT groupId,
>> >            userId,
>> >            MAX(requestTime) as latestRequestTime
>> >     FROM incoming_data
>> >
>> >     GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;
>> >
>> > ```
>> >
>> >
>> > Thanks!

Reply via email to