Hi Austin,

if you don't want to worry about time at all, you should probably not use any windows because those are a time-based operation.

A solution that would look a bit nicer could be to use a pure KeyedProcessFunction and implement the deduplication logic without reusing windows. In ProcessFunctions you can register an event-time timer. The timer would be triggered by the MAX_WATERMARK when the pipeline shuts down even without having a timestamp assigned in the StreamRecord. Watermark will leave SQL also without a time attribute as far as I know.

Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:
Hey Timo,

Sorry for the delayed reply. I'm using the Blink planner and using non-time-based joins. I've got an example repo here that shows my query/ setup [1]. It's got the manual timestamp assignment commented out for now, but that does indeed solve the issue.

I'd really like to not worry about time at all in this job hah -- I started just using processing time, but Till pointed out that processing time timers won't be fired when input ends, which is the case for this streaming job processing CSV files, so I should be using event time. With that suggestion, I switched to ingestion time, where I then discovered the issue converting from SQL to data stream.

IMO, as a user manually assigning timestamps on conversion makes sense if you're using event time and already handling time attributes yourself, but for ingestion time you really don't want to think about time at all, which is why it might make sense to propigate the automatically assigned timestamps in that case. Though not sure how difficult that would be. Let me know what you think!


Best + thanks again,
Austin

[1]: https://github.com/austince/flink-1.10-sql-windowing-error

On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Btw which planner are you using?

    Regards,
    Timo

    On 05.10.20 10:23, Timo Walther wrote:
     > Hi Austin,
     >
     > could you share some details of your SQL query with us? The
    reason why
     > I'm asking is because I guess that the rowtime field is not inserted
     > into the `StreamRecord` of DataStream API. The rowtime field is only
     > inserted if there is a single field in the output of the query
    that is a
     > valid "time attribute".
     >
     > Esp. after non-time-based joins and aggregations, time attributes
    loose
     > there properties and become regular timestamps. Because timestamp
    and
     > watermarks might have diverged.
     >
     > If you know what you're doing, you can also assign the timestamp
     > manually after `toRetractStream.assignTimestampAndWatermarks` and
     > reinsert the field into the stream record. But before you do that, I
     > think it is better to share more information about the query with us.
     >
     > I hope this helps.
     >
     > Regards,
     > Timo
     >
     >
     >
     > On 05.10.20 09:25, Till Rohrmann wrote:
     >> Hi Austin,
     >>
     >> thanks for offering to help. First I would suggest asking Timo
    whether
     >> this is an aspect which is still missing or whether we
    overlooked it.
     >> Based on that we can then take the next steps.
     >>
     >> Cheers,
     >> Till
     >>
     >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
     >> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
    <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>>
    wrote:
     >>
     >>     Hey Till,
     >>
     >>     Thanks for the notes. Yeah, the docs don't mention anything
    specific
     >>     to this case, not sure if it's an uncommon one. Assigning
    timestamps
     >>     on conversion does solve the issue. I'm happy to take a stab at
     >>     implementing the feature if it is indeed missing and you all
    think
     >>     it'd be worthwhile. I think it's definitely a confusing
    aspect of
     >>     working w/ the Table & DataStream APIs together.
     >>
     >>     Best,
     >>     Austin
     >>
     >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
    <trohrm...@apache.org <mailto:trohrm...@apache.org>
     >>     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>
    wrote:
     >>
     >>         Hi Austin,
     >>
     >>         yes, it should also work for ingestion time.
     >>
     >>         I am not entirely sure whether event time is preserved when
     >>         converting a Table into a retract stream. It should be
    possible
     >>         and if it is not working, then I guess it is a missing
    feature.
     >>         But I am sure that @Timo Walther
     >>         <mailto:twal...@apache.org
    <mailto:twal...@apache.org>> knows more about it. In doubt, you
     >>         could assign a new watermark generator when having
    obtained the
     >>         retract stream.
     >>
     >>         Here is also a link to some information about event time and
     >>         watermarks [1]. Unfortunately, it does not state
    anything about
     >>         the direction Table => DataStream.
     >>
     >>         [1]
     >>
     >>
    
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html

     >>
     >>
     >>         Cheers,
     >>         Till
     >>
     >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
     >>         <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>> wrote:
     >>
     >>             Hey Till,
     >>
     >>             Just a quick question on time characteristics --
    this should
     >>             work for IngestionTime as well, correct? Is there
    anything
     >>             special I need to do to have the CsvTableSource/
     >>             toRetractStream call to carry through the assigned
     >>             timestamps, or do I have to re-assign timestamps
    during the
     >>             conversion? I'm currently getting the `Record has
     >>             Long.MIN_VALUE timestamp (= no timestamp marker)` error,
     >>             though I'm seeing timestamps being assigned if I step
     >>             through the AutomaticWatermarkContext.
     >>
     >>             Thanks,
     >>             Austin
     >>
     >>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
     >>             <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>
     >>             wrote:
     >>
     >>                 Perfect, thanks so much Till!
     >>
     >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
     >>                 <trohrm...@apache.org
    <mailto:trohrm...@apache.org> <mailto:trohrm...@apache.org
    <mailto:trohrm...@apache.org>>>
     >> wrote:
     >>
     >>                     Hi Austin,
     >>
     >>                     I believe that the problem is the processing
    time
     >>                     window. Unlike for event time where we send a
     >>                     MAX_WATERMARK at the end of the stream to
    trigger
     >>                     all remaining windows, this does not happen for
     >>                     processing time windows. Hence, if your
    stream ends
     >>                     and you still have an open processing time
    window,
     >>                     then it will never get triggered.
     >>
     >>                     The problem should disappear if you use
    event time
     >>                     or if you process unbounded streams which
    never end.
     >>
     >>                     Cheers,
     >>                     Till
     >>
     >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
     >>                     Cawley-Edwards <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >>                     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>> wrote:
     >>
     >>                         Hey all,
     >>
     >>                         Thanks for your patience. I've got a
    small repo
     >>                         that reproduces the issue here:
     >>
     >> https://github.com/austince/flink-1.10-sql-windowing-error
     >>
     >>
     >>                         Not sure what I'm doing wrong but it
    feels silly.
     >>
     >>                         Thanks so much!
     >>                         Austin
     >>
     >>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
     >>                         Cawley-Edwards <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >>                         <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>> wrote:
     >>
     >>                             Hey Till,
     >>
     >>                             Thanks for the reply -- I'll try to
    see if I
     >>                             can reproduce this in a small repo
    and share
     >>                             it with you.
     >>
     >>                             Best,
     >>                             Austin
     >>
     >>                             On Tue, Sep 29, 2020 at 3:58 AM Till
     >>                             Rohrmann <trohrm...@apache.org
    <mailto:trohrm...@apache.org>
     >>                             <mailto:trohrm...@apache.org
    <mailto:trohrm...@apache.org>>> wrote:
     >>
     >>                                 Hi Austin,
     >>
     >>                                 could you share with us the
    exact job
     >>                                 you are running (including the
    custom
     >>                                 window trigger)? This would help
    us to
     >>                                 better understand your problem.
     >>
     >>                                 I am also pulling in Klou and
    Timo who
     >>                                 might help with the windowing
    logic and
     >>                                 the Table to DataStream conversion.
     >>
     >>                                 Cheers,
     >>                                 Till
     >>
     >>                                 On Mon, Sep 28, 2020 at 11:49 PM
    Austin
     >>                                 Cawley-Edwards
    <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
     >>                                 <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>> wrote:
     >>
     >>                                     Hey all,
     >>
     >>                                     I'm not sure if I've missed
     >>                                     something in the docs, but I'm
     >>                                     having a bit of trouble with a
     >>                                     streaming SQL job that
    starts w/ raw
     >>                                     SQL queries and then
    transitions to
     >>                                     a more traditional streaming
    job.
     >>                                     I'm on Flink 1.10 using the
    Blink
     >>                                     planner, running locally with no
     >>                                     checkpointing.
     >>
     >>                                     The job looks roughly like:
     >>
     >>                                     CSV 1 -->
     >>                                     CSV 2 -->  SQL Query to Join -->
     >>                                     toRetractStream --> keyed time
     >>                                     window w/ process func & custom
     >>                                     trigger --> some other ops
     >>                                     CSV 3 -->
     >>
     >>
     >>                                     When I remove the windowing
    directly
     >>                                     after the `toRetractStream`, the
     >>                                     records make it to the "some
    other
     >>                                     ops" stage, but with the
    windowing,
     >>                                     those operations are
    sometimes not
     >>                                     sent any data. I can also
    get data
     >>                                     sent to the downstream
    operators by
     >>                                     putting in a no-op map
    before the
     >>                                     window and placing some
    breakpoints
     >>                                     in there to manually slow down
     >>                                     processing.
     >>
     >>
     >>                                     The logs don't seem to indicate
     >>                                     anything went wrong and
    generally
     >>                                     look like:
     >>
     >>                                     4819 [Source: Custom File source
     >>                                     (1/1)] INFO
     >>
     >>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
     >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
    RUNNING
     >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
     >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
    resources
     >> for Source: Custom File source (1/1)
    (3578629787c777320d9ab030c004abd4).
     >>                                     4819 [Source: Custom File source
     >>                                     (1/1)] INFO
     >>
     >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
    FileSystem
     >> streams are closed for task Source: Custom File source (1/1)
     >> (3578629787c777320d9ab030c004abd4) [FINISHED]
     >>                                     4820
     >>
     >> [flink-akka.actor.default-dispatcher-5]
     >>                                     INFO
     >>
     >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
    Un-registering
     >> task and sending final execution state FINISHED to JobManager
    for task
     >> Source: Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
     >>                                     ...
     >>                                     4996
     >>
     >> [Window(TumblingProcessingTimeWindows(60000),
     >>                                     TimedCountTrigger,
     >>                                     ProcessWindowFunction$1)
    (1/1)] INFO
     >>
     >>  org.apache.flink.runtime.taskmanager.Task  -
     >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
     >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
     >> switched from RUNNING to FINISHED.
     >>                                     4996
     >>
     >> [Window(TumblingProcessingTimeWindows(60000),
     >>                                     TimedCountTrigger,
     >>                                     ProcessWindowFunction$1)
    (1/1)] INFO
     >>
     >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
    resources
     >> for Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
     >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
     >>                                     4996
     >>
     >> [Window(TumblingProcessingTimeWindows(60000),
     >>                                     TimedCountTrigger,
     >>                                     ProcessWindowFunction$1)
    (1/1)] INFO
     >>
     >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
    FileSystem
     >> streams are closed for task
     >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
     >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
     >> [FINISHED]
     >>                                     ...
     >>                                     rest of the shutdown
     >>                                     ...
     >>                                     Program execution finished
     >>                                     Job with JobID
>> 889b161e432c0e69a8d760bbed205d5d has
     >>                                     finished.
     >>                                     Job Runtime: 783 ms
     >>
     >>
     >>                                     Is there something I'm
    missing in my
     >>                                     setup? Could it be my custom
    window
     >>                                     trigger? Bug? I'm stumped.
     >>
     >>
     >>                                     Thanks,
     >>                                     Austin
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >>
     >


Reply via email to