Hi Roman,

Thanks for the detailed explanation.

I did try 1.13 and 1.14, but it still didn't work.

I explicitly enabled the checkpoint with:
`env.enable_checkpointing(10)`. Any other configurations I need to set?

Thanks,
-Guoqin

On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Guoqin,
>
> Thanks for the clarification.
>
> Processing time windows actually don't need watermarks: they fire when
> window end time comes.
> But the job will likely finish earlier because of the bounded input.
>
> Handling of this case was improved in 1.14 as part of FLIP-147, as
> well as in previous versions.
> So I'd suggest trying the most recent Flink version.
>
> Regards,
> Roman
>
> On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >
> > Hi Roman,
> >
> > Thanks for your quick response!
> >
> > Yes, it does seem to be the window closing problem.
> >
> > So if I change the tumble window on eventTime, which is column
> 'requestTime', it works fine. I guess the EOF of the test data file kicks
> in a watermark of Long.MAX_VALUE.
> >
> > But my application code needs to calculate the result based on the
> tumble window of processTime. Curious what is the typical setup for such a
> local test. ie. How do we inform the Flink to close the window if the input
> stream hits the end.
> >
> > Thanks,
> > -Guoqin
> >
> > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>
> >> Hi Guoqin,
> >>
> >> I think the problem might be related to watermarks and checkpointing:
> >> - if the file is too small, the only watermark will be the one after
> >> fully reading the file
> >> - in exactly once mode, sink waits for a checkpoint completion before
> >> committing the files
> >>
> >> Recently, there were some improvements in handling the last
> >> checkpoint, so you might want to try Flink 1.14.
> >> You might also try to configure checkpointing (e.g. decrease interval
> >> [1]) and watermark generation [2] to diagnose the problem.
> >>
> >> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval
> >> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
> >>
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >> >
> >> > Hi all,
> >> >
> >> > I am reposting my stackoverflow question here.
> >> >
> >> > So I would like to do a local test of my pyflink sql application. The
> sql query does a very simple job: read from source, window aggregation and
> write result to a sink.
> >> >
> >> > In the production, the source and sink will be kinesis stream. But
> since I need to do a local test, I am mocking out the source and sink with
> the local filesystem.
> >> >
> >> > The problem is the local job always runs successfully, but the
> results were never written out to the sink file. Curious what I am missing
> here.
> >> >
> >> > Note:
> >> >
> >> > I am using Flink 1.11.0
> >> > if I directly read data from the source table by select query and
> write to sink without windowing or grouping, it works just fine. That means
> the source and sink table is set up correctly. So it seems the problem is
> around the Tumbling and grouping for the local filesystem.
> >> > This code works fine with Kinesis source and sink.
> >> >
> >> >
> >> > Here is the SQL code:
> >> >
> >> > ```
> >> >
> >> > CREATE TABLE incoming_data (
> >> >         requestId VARCHAR(4),
> >> >         groupId VARCHAR(32),
> >> >         userId VARCHAR(32),
> >> >         requestStartTime VARCHAR(32),
> >> >         processTime AS PROCTIME(),
> >> >         requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime,
> 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
> >> >         WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
> >> >     ) WITH (
> >> >         'connector' = 'filesystem',
> >> >         'path' = '/path/to/test/json/file.json',
> >> >         'format' = 'json',
> >> >         'json.timestamp-format.standard' = 'ISO-8601'
> >> >     )
> >> >
> >> > CREATE TABLE user_latest_request (
> >> >         groupId VARCHAR(32),
> >> >         userId VARCHAR(32),
> >> >         latestRequestTime TIMESTAMP
> >> >     ) WITH (
> >> >         'connector' = 'filesystem',
> >> >         'path' = '/path/to/sink',
> >> >         'format' = 'csv'
> >> > )
> >> >
> >> > INSERT INTO user_latest_request
> >> >     SELECT groupId,
> >> >            userId,
> >> >            MAX(requestTime) as latestRequestTime
> >> >     FROM incoming_data
> >> >
> >> >     GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId,
> userId;
> >> >
> >> > ```
> >> >
> >> >
> >> > Thanks!
>

Reply via email to