Hi Roman,

Thanks for your quick response!

Yes, it does seem to be the window closing problem.

So if I change the tumble window on eventTime, which is column 'requestTime',
it works fine. I guess the EOF of the test data file kicks in a watermark
of Long.MAX_VALUE.

But my application code needs to calculate the result based on the tumble
window of processTime. Curious what is the typical setup for such a local
test. ie. How do we inform the Flink to close the window if the input
stream hits the end.

Thanks,
-Guoqin

On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Guoqin,
>
> I think the problem might be related to watermarks and checkpointing:
> - if the file is too small, the only watermark will be the one after
> fully reading the file
> - in exactly once mode, sink waits for a checkpoint completion before
> committing the files
>
> Recently, there were some improvements in handling the last
> checkpoint, so you might want to try Flink 1.14.
> You might also try to configure checkpointing (e.g. decrease interval
> [1]) and watermark generation [2] to diagnose the problem.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
>
>
> Regards,
> Roman
>
> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >
> > Hi all,
> >
> > I am reposting my stackoverflow question here.
> >
> > So I would like to do a local test of my pyflink sql application. The
> sql query does a very simple job: read from source, window aggregation and
> write result to a sink.
> >
> > In the production, the source and sink will be kinesis stream. But since
> I need to do a local test, I am mocking out the source and sink with the
> local filesystem.
> >
> > The problem is the local job always runs successfully, but the results
> were never written out to the sink file. Curious what I am missing here.
> >
> > Note:
> >
> > I am using Flink 1.11.0
> > if I directly read data from the source table by select query and write
> to sink without windowing or grouping, it works just fine. That means the
> source and sink table is set up correctly. So it seems the problem is
> around the Tumbling and grouping for the local filesystem.
> > This code works fine with Kinesis source and sink.
> >
> >
> > Here is the SQL code:
> >
> > ```
> >
> > CREATE TABLE incoming_data (
> >         requestId VARCHAR(4),
> >         groupId VARCHAR(32),
> >         userId VARCHAR(32),
> >         requestStartTime VARCHAR(32),
> >         processTime AS PROCTIME(),
> >         requestTime AS TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime,
> 'T', ' '), 0, 23), 'yyyy-MM-dd HH:mm:ss.SSS'),
> >         WATERMARK FOR requestTime AS requestTime - INTERVAL '5' SECOND
> >     ) WITH (
> >         'connector' = 'filesystem',
> >         'path' = '/path/to/test/json/file.json',
> >         'format' = 'json',
> >         'json.timestamp-format.standard' = 'ISO-8601'
> >     )
> >
> > CREATE TABLE user_latest_request (
> >         groupId VARCHAR(32),
> >         userId VARCHAR(32),
> >         latestRequestTime TIMESTAMP
> >     ) WITH (
> >         'connector' = 'filesystem',
> >         'path' = '/path/to/sink',
> >         'format' = 'csv'
> > )
> >
> > INSERT INTO user_latest_request
> >     SELECT groupId,
> >            userId,
> >            MAX(requestTime) as latestRequestTime
> >     FROM incoming_data
> >
> >     GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId, userId;
> >
> > ```
> >
> >
> > Thanks!
>

Reply via email to