Hi Guoqin,

I was able to reproduce the problem locally. I can see that at the
time of window firing the services are already closed because of the
emitted MAX_WATERMARK.

Previously, there were some discussions around waiting for all timers
to complete [1], but AFAIK there was not much demand to implement it.
Probably Piotr or Arvid know more.

Please feel free to bump the ticket.

[1]
https://issues.apache.org/jira/browse/FLINK-18647

Regards,
Roman

On Wed, Nov 17, 2021 at 7:36 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote:
>
> Hi Roman,
>
> Thanks for the detailed explanation.
>
> I did try 1.13 and 1.14, but it still didn't work.
>
> I explicitly enabled the checkpoint with: `env.enable_checkpointing(10)`. Any 
> other configurations I need to set?
>
> Thanks,
> -Guoqin
>
> On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi Guoqin,
>>
>> Thanks for the clarification.
>>
>> Processing time windows actually don't need watermarks: they fire when
>> window end time comes.
>> But the job will likely finish earlier because of the bounded input.
>>
>> Handling of this case was improved in 1.14 as part of FLIP-147, as
>> well as in previous versions.
>> So I'd suggest trying the most recent Flink version.
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com> wrote:
>> >
>> > Hi Roman,
>> >
>> > Thanks for your quick response!
>> >
>> > Yes, it does seem to be the window closing problem.
>> >
>> > So if I change the tumble window on eventTime, which is column 
>> > 'requestTime', it works fine. I guess the EOF of the test data file kicks 
>> > in a watermark of Long.MAX_VALUE.
>> >
>> > But my application code needs to calculate the result based on the tumble 
>> > window of processTime. Curious what is the typical setup for such a local 
>> > test. ie. How do we inform the Flink to close the window if the input 
>> > stream hits the end.
>> >
>> > Thanks,
>> > -Guoqin
>> >
>> > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> 
>> > wrote:
>> >>
>> >> Hi Guoqin,
>> >>
>> >> I think the problem might be related to watermarks and checkpointing:
>> >> - if the file is too small, the only watermark will be the one after
>> >> fully reading the file
>> >> - in exactly once mode, sink waits for a checkpoint completion before
>> >> committing the files
>> >>
>> >> Recently, there were some improvements in handling the last
>> >> checkpoint, so you might want to try Flink 1.14.
>> >> You might also try to configure checkpointing (e.g. decrease interval
>> >> [1]) and watermark generation [2] to diagnose the problem.
>> >>
>> >> [1] 
>> >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval
>> >> [2] 
>> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
>> >>
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com> 
>> >> wrote:
>> >> >
>> >> > Hi all,
>> >> >
>> >> > I am reposting my stackoverflow question here.
>> >> >
>> >> > So I would like to do a local test of my pyflink sql application. The 
>> >> > sql query does a very simple job: read from source, window aggregation 
>> >> > and write result to a sink.
>> >> >
>> >> > In the production, the source and sink will be kinesis stream. But 
>> >> > since I need to do a local test, I am mocking out the source and sink 
>> >> > with the local filesystem.
>> >> >
>> >> > The problem is the local job always runs successfully, but the results 
>> >> > were never written out to the sink file. Curious what I am missing here.
>> >> >
>> >> > Note:
>> >> >
>> >> > I am using Flink 1.11.0
>> >> > if I directly read data from the source table by select query and write 
>> >> > to sink without windowing or grouping, it works just fine. That means 
>> >> > the source and sink table is set up correctly. So it seems the problem 
>> >> > is around the Tumbling and grouping for the local filesystem.
>> >> > This code works fine with Kinesis source and sink.
>> >> >
>> >> >
>> >> > Here is the SQL code:
>> >> >
>> >> > ```
>> >> >
>> >> > CREATE TABLE incoming_data (
>> >> >         requestId VARCHAR(4),
>> >> >         groupId VARCHAR(32),
>> >> >         userId VARCHAR(32),
>> >> >         requestStartTime VARCHAR(32),
>> >> >         processTime AS PROCTIME(),
>> >> >         requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 
>> >> > 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
>> >> >         WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
>> >> >     ) WITH (
>> >> >         'connector' = 'filesystem',
>> >> >         'path' = '/path/to/test/json/file.json',
>> >> >         'format' = 'json',
>> >> >         'json.timestamp-format.standard' = 'ISO-8601'
>> >> >     )
>> >> >
>> >> > CREATE TABLE user_latest_request (
>> >> >         groupId VARCHAR(32),
>> >> >         userId VARCHAR(32),
>> >> >         latestRequestTime TIMESTAMP
>> >> >     ) WITH (
>> >> >         'connector' = 'filesystem',
>> >> >         'path' = '/path/to/sink',
>> >> >         'format' = 'csv'
>> >> > )
>> >> >
>> >> > INSERT INTO user_latest_request
>> >> >     SELECT groupId,
>> >> >            userId,
>> >> >            MAX(requestTime) as latestRequestTime
>> >> >     FROM incoming_data
>> >> >
>> >> >     GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;
>> >> >
>> >> > ```
>> >> >
>> >> >
>> >> > Thanks!

Reply via email to