Hi Roman,

Thanks for the update and testing locally. This is very informative.

-Guoqin

On Mon, Nov 22, 2021 at 10:55 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Guoqin,
>
> I was able to reproduce the problem locally. I can see that at the
> time of window firing the services are already closed because of the
> emitted MAX_WATERMARK.
>
> Previously, there were some discussions around waiting for all timers
> to complete [1], but AFAIK there was not much demand to implement it.
> Probably Piotr or Arvid know more.
>
> Please feel free to bump the ticket.
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-18647
>
> Regards,
> Roman
>
> On Wed, Nov 17, 2021 at 7:36 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >
> > Hi Roman,
> >
> > Thanks for the detailed explanation.
> >
> > I did try 1.13 and 1.14, but it still didn't work.
> >
> > I explicitly enabled the checkpoint with:
> `env.enable_checkpointing(10)`. Any other configurations I need to set?
> >
> > Thanks,
> > -Guoqin
> >
> > On Wed, Nov 17, 2021 at 4:30 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >>
> >> Hi Guoqin,
> >>
> >> Thanks for the clarification.
> >>
> >> Processing time windows actually don't need watermarks: they fire when
> >> window end time comes.
> >> But the job will likely finish earlier because of the bounded input.
> >>
> >> Handling of this case was improved in 1.14 as part of FLIP-147, as
> >> well as in previous versions.
> >> So I'd suggest trying the most recent Flink version.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Nov 15, 2021 at 10:17 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >> >
> >> > Hi Roman,
> >> >
> >> > Thanks for your quick response!
> >> >
> >> > Yes, it does seem to be the window closing problem.
> >> >
> >> > So if I change the tumble window on eventTime, which is column
> 'requestTime', it works fine. I guess the EOF of the test data file kicks
> in a watermark of Long.MAX_VALUE.
> >> >
> >> > But my application code needs to calculate the result based on the
> tumble window of processTime. Curious what is the typical setup for such a
> local test. ie. How do we inform the Flink to close the window if the input
> stream hits the end.
> >> >
> >> > Thanks,
> >> > -Guoqin
> >> >
> >> > On Mon, Nov 15, 2021 at 11:21 AM Roman Khachatryan <ro...@apache.org>
> wrote:
> >> >>
> >> >> Hi Guoqin,
> >> >>
> >> >> I think the problem might be related to watermarks and checkpointing:
> >> >> - if the file is too small, the only watermark will be the one after
> >> >> fully reading the file
> >> >> - in exactly once mode, sink waits for a checkpoint completion before
> >> >> committing the files
> >> >>
> >> >> Recently, there were some improvements in handling the last
> >> >> checkpoint, so you might want to try Flink 1.14.
> >> >> You might also try to configure checkpointing (e.g. decrease interval
> >> >> [1]) and watermark generation [2] to diagnose the problem.
> >> >>
> >> >> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#execution-checkpointing-interval
> >> >> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
> >> >>
> >> >>
> >> >> Regards,
> >> >> Roman
> >> >>
> >> >> On Mon, Nov 15, 2021 at 6:43 PM Guoqin Zheng <lanson.zh...@gmail.com>
> wrote:
> >> >> >
> >> >> > Hi all,
> >> >> >
> >> >> > I am reposting my stackoverflow question here.
> >> >> >
> >> >> > So I would like to do a local test of my pyflink sql application.
> The sql query does a very simple job: read from source, window aggregation
> and write result to a sink.
> >> >> >
> >> >> > In the production, the source and sink will be kinesis stream. But
> since I need to do a local test, I am mocking out the source and sink with
> the local filesystem.
> >> >> >
> >> >> > The problem is the local job always runs successfully, but the
> results were never written out to the sink file. Curious what I am missing
> here.
> >> >> >
> >> >> > Note:
> >> >> >
> >> >> > I am using Flink 1.11.0
> >> >> > if I directly read data from the source table by select query and
> write to sink without windowing or grouping, it works just fine. That means
> the source and sink table is set up correctly. So it seems the problem is
> around the Tumbling and grouping for the local filesystem.
> >> >> > This code works fine with Kinesis source and sink.
> >> >> >
> >> >> >
> >> >> > Here is the SQL code:
> >> >> >
> >> >> > ```
> >> >> >
> >> >> > CREATE TABLE incoming_data (
> >> >> >         requestId VARCHAR(4),
> >> >> >         groupId VARCHAR(32),
> >> >> >         userId VARCHAR(32),
> >> >> >         requestStartTime VARCHAR(32),
> >> >> >         processTime AS PROCTIME(),
> >> >> >         requestTime AS
> TO_TIMESTAMP(SUBSTR(REPLACE(requestStartTime, 'T', ' '), 0, 23),
> 'yyyy-MM-dd HH:mm:ss.SSS'),
> >> >> >         WATERMARK FOR requestTime AS requestTime - INTERVAL '5'
> SECOND
> >> >> >     ) WITH (
> >> >> >         'connector' = 'filesystem',
> >> >> >         'path' = '/path/to/test/json/file.json',
> >> >> >         'format' = 'json',
> >> >> >         'json.timestamp-format.standard' = 'ISO-8601'
> >> >> >     )
> >> >> >
> >> >> > CREATE TABLE user_latest_request (
> >> >> >         groupId VARCHAR(32),
> >> >> >         userId VARCHAR(32),
> >> >> >         latestRequestTime TIMESTAMP
> >> >> >     ) WITH (
> >> >> >         'connector' = 'filesystem',
> >> >> >         'path' = '/path/to/sink',
> >> >> >         'format' = 'csv'
> >> >> > )
> >> >> >
> >> >> > INSERT INTO user_latest_request
> >> >> >     SELECT groupId,
> >> >> >            userId,
> >> >> >            MAX(requestTime) as latestRequestTime
> >> >> >     FROM incoming_data
> >> >> >
> >> >> >     GROUP BY TUMBLE(processTime, INTERVAL '1' SECOND), groupId,
> userId;
> >> >> >
> >> >> > ```
> >> >> >
> >> >> >
> >> >> > Thanks!
>

Reply via email to