Hey Timo,

Hah, that's a fair point about using time. I guess I should update my
statement to "as a user, I don't want to worry about *manually managing*
time".

That's a nice suggestion with the KeyedProcessFunction and no windows, I'll
give that a shot. If I don't want to emit any duplicates, I'd have to
essentially buffer the "last seen duplicate" for each key in that process
function until the MAX_WATERMARK is sent through though, right? I could
emit early results if I assume the max number of possible duplicates, but
for records with no duplicates, I'd have to wait until no more records are
coming -- am I missing something?

Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <twal...@apache.org> wrote:

> Hi Austin,
>
> if you don't want to worry about time at all, you should probably not
> use any windows because those are a time-based operation.
>
> A solution that would look a bit nicer could be to use a pure
> KeyedProcessFunction and implement the deduplication logic without
> reusing windows. In ProcessFunctions you can register an event-time
> timer. The timer would be triggered by the MAX_WATERMARK when the
> pipeline shuts down even without having a timestamp assigned in the
> StreamRecord. Watermark will leave SQL also without a time attribute as
> far as I know.
>
> Regards,
> Timo
>
>
> On 08.10.20 17:38, Austin Cawley-Edwards wrote:
> > Hey Timo,
> >
> > Sorry for the delayed reply. I'm using the Blink planner and using
> > non-time-based joins. I've got an example repo here that shows my query/
> > setup [1]. It's got the manual timestamp assignment commented out for
> > now, but that does indeed solve the issue.
> >
> > I'd really like to not worry about time at all in this job hah -- I
> > started just using processing time, but Till pointed out that processing
> > time timers won't be fired when input ends, which is the case for this
> > streaming job processing CSV files, so I should be using event time.
> > With that suggestion, I switched to ingestion time, where I then
> > discovered the issue converting from SQL to data stream.
> >
> > IMO, as a user manually assigning timestamps on conversion makes sense
> > if you're using event time and already handling time attributes
> > yourself, but for ingestion time you really don't want to think about
> > time at all, which is why it might make sense to propigate the
> > automatically assigned timestamps in that case. Though not sure how
> > difficult that would be. Let me know what you think!
> >
> >
> > Best + thanks again,
> > Austin
> >
> > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
> >
> > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Btw which planner are you using?
> >
> >     Regards,
> >     Timo
> >
> >     On 05.10.20 10:23, Timo Walther wrote:
> >      > Hi Austin,
> >      >
> >      > could you share some details of your SQL query with us? The
> >     reason why
> >      > I'm asking is because I guess that the rowtime field is not
> inserted
> >      > into the `StreamRecord` of DataStream API. The rowtime field is
> only
> >      > inserted if there is a single field in the output of the query
> >     that is a
> >      > valid "time attribute".
> >      >
> >      > Esp. after non-time-based joins and aggregations, time attributes
> >     loose
> >      > there properties and become regular timestamps. Because timestamp
> >     and
> >      > watermarks might have diverged.
> >      >
> >      > If you know what you're doing, you can also assign the timestamp
> >      > manually after `toRetractStream.assignTimestampAndWatermarks` and
> >      > reinsert the field into the stream record. But before you do
> that, I
> >      > think it is better to share more information about the query with
> us.
> >      >
> >      > I hope this helps.
> >      >
> >      > Regards,
> >      > Timo
> >      >
> >      >
> >      >
> >      > On 05.10.20 09:25, Till Rohrmann wrote:
> >      >> Hi Austin,
> >      >>
> >      >> thanks for offering to help. First I would suggest asking Timo
> >     whether
> >      >> this is an aspect which is still missing or whether we
> >     overlooked it.
> >      >> Based on that we can then take the next steps.
> >      >>
> >      >> Cheers,
> >      >> Till
> >      >>
> >      >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> >      >> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
> >     <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>>
> >     wrote:
> >      >>
> >      >>     Hey Till,
> >      >>
> >      >>     Thanks for the notes. Yeah, the docs don't mention anything
> >     specific
> >      >>     to this case, not sure if it's an uncommon one. Assigning
> >     timestamps
> >      >>     on conversion does solve the issue. I'm happy to take a stab
> at
> >      >>     implementing the feature if it is indeed missing and you all
> >     think
> >      >>     it'd be worthwhile. I think it's definitely a confusing
> >     aspect of
> >      >>     working w/ the Table & DataStream APIs together.
> >      >>
> >      >>     Best,
> >      >>     Austin
> >      >>
> >      >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
> >     <trohrm...@apache.org <mailto:trohrm...@apache.org>
> >      >>     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>
> >     wrote:
> >      >>
> >      >>         Hi Austin,
> >      >>
> >      >>         yes, it should also work for ingestion time.
> >      >>
> >      >>         I am not entirely sure whether event time is preserved
> when
> >      >>         converting a Table into a retract stream. It should be
> >     possible
> >      >>         and if it is not working, then I guess it is a missing
> >     feature.
> >      >>         But I am sure that @Timo Walther
> >      >>         <mailto:twal...@apache.org
> >     <mailto:twal...@apache.org>> knows more about it. In doubt, you
> >      >>         could assign a new watermark generator when having
> >     obtained the
> >      >>         retract stream.
> >      >>
> >      >>         Here is also a link to some information about event time
> and
> >      >>         watermarks [1]. Unfortunately, it does not state
> >     anything about
> >      >>         the direction Table => DataStream.
> >      >>
> >      >>         [1]
> >      >>
> >      >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
> >
> >      >>
> >      >>
> >      >>         Cheers,
> >      >>         Till
> >      >>
> >      >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
> >      >>         <austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>>> wrote:
> >      >>
> >      >>             Hey Till,
> >      >>
> >      >>             Just a quick question on time characteristics --
> >     this should
> >      >>             work for IngestionTime as well, correct? Is there
> >     anything
> >      >>             special I need to do to have the CsvTableSource/
> >      >>             toRetractStream call to carry through the assigned
> >      >>             timestamps, or do I have to re-assign timestamps
> >     during the
> >      >>             conversion? I'm currently getting the `Record has
> >      >>             Long.MIN_VALUE timestamp (= no timestamp marker)`
> error,
> >      >>             though I'm seeing timestamps being assigned if I step
> >      >>             through the AutomaticWatermarkContext.
> >      >>
> >      >>             Thanks,
> >      >>             Austin
> >      >>
> >      >>             On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
> >      >>             <austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>>>
> >      >>             wrote:
> >      >>
> >      >>                 Perfect, thanks so much Till!
> >      >>
> >      >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
> >      >>                 <trohrm...@apache.org
> >     <mailto:trohrm...@apache.org> <mailto:trohrm...@apache.org
> >     <mailto:trohrm...@apache.org>>>
> >      >> wrote:
> >      >>
> >      >>                     Hi Austin,
> >      >>
> >      >>                     I believe that the problem is the processing
> >     time
> >      >>                     window. Unlike for event time where we send a
> >      >>                     MAX_WATERMARK at the end of the stream to
> >     trigger
> >      >>                     all remaining windows, this does not happen
> for
> >      >>                     processing time windows. Hence, if your
> >     stream ends
> >      >>                     and you still have an open processing time
> >     window,
> >      >>                     then it will never get triggered.
> >      >>
> >      >>                     The problem should disappear if you use
> >     event time
> >      >>                     or if you process unbounded streams which
> >     never end.
> >      >>
> >      >>                     Cheers,
> >      >>                     Till
> >      >>
> >      >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
> >      >>                     Cawley-Edwards <austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>
> >      >>                     <mailto:austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>>> wrote:
> >      >>
> >      >>                         Hey all,
> >      >>
> >      >>                         Thanks for your patience. I've got a
> >     small repo
> >      >>                         that reproduces the issue here:
> >      >>
> >      >> https://github.com/austince/flink-1.10-sql-windowing-error
> >      >>
> >      >>
> >      >>                         Not sure what I'm doing wrong but it
> >     feels silly.
> >      >>
> >      >>                         Thanks so much!
> >      >>                         Austin
> >      >>
> >      >>                         On Tue, Sep 29, 2020 at 3:48 PM Austin
> >      >>                         Cawley-Edwards <austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>
> >      >>                         <mailto:austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>>> wrote:
> >      >>
> >      >>                             Hey Till,
> >      >>
> >      >>                             Thanks for the reply -- I'll try to
> >     see if I
> >      >>                             can reproduce this in a small repo
> >     and share
> >      >>                             it with you.
> >      >>
> >      >>                             Best,
> >      >>                             Austin
> >      >>
> >      >>                             On Tue, Sep 29, 2020 at 3:58 AM Till
> >      >>                             Rohrmann <trohrm...@apache.org
> >     <mailto:trohrm...@apache.org>
> >      >>                             <mailto:trohrm...@apache.org
> >     <mailto:trohrm...@apache.org>>> wrote:
> >      >>
> >      >>                                 Hi Austin,
> >      >>
> >      >>                                 could you share with us the
> >     exact job
> >      >>                                 you are running (including the
> >     custom
> >      >>                                 window trigger)? This would help
> >     us to
> >      >>                                 better understand your problem.
> >      >>
> >      >>                                 I am also pulling in Klou and
> >     Timo who
> >      >>                                 might help with the windowing
> >     logic and
> >      >>                                 the Table to DataStream
> conversion.
> >      >>
> >      >>                                 Cheers,
> >      >>                                 Till
> >      >>
> >      >>                                 On Mon, Sep 28, 2020 at 11:49 PM
> >     Austin
> >      >>                                 Cawley-Edwards
> >     <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
> >      >>                                 <mailto:austin.caw...@gmail.com
> >     <mailto:austin.caw...@gmail.com>>> wrote:
> >      >>
> >      >>                                     Hey all,
> >      >>
> >      >>                                     I'm not sure if I've missed
> >      >>                                     something in the docs, but
> I'm
> >      >>                                     having a bit of trouble with
> a
> >      >>                                     streaming SQL job that
> >     starts w/ raw
> >      >>                                     SQL queries and then
> >     transitions to
> >      >>                                     a more traditional streaming
> >     job.
> >      >>                                     I'm on Flink 1.10 using the
> >     Blink
> >      >>                                     planner, running locally
> with no
> >      >>                                     checkpointing.
> >      >>
> >      >>                                     The job looks roughly like:
> >      >>
> >      >>                                     CSV 1 -->
> >      >>                                     CSV 2 -->  SQL Query to Join
> -->
> >      >>                                     toRetractStream --> keyed
> time
> >      >>                                     window w/ process func &
> custom
> >      >>                                     trigger --> some other ops
> >      >>                                     CSV 3 -->
> >      >>
> >      >>
> >      >>                                     When I remove the windowing
> >     directly
> >      >>                                     after the `toRetractStream`,
> the
> >      >>                                     records make it to the "some
> >     other
> >      >>                                     ops" stage, but with the
> >     windowing,
> >      >>                                     those operations are
> >     sometimes not
> >      >>                                     sent any data. I can also
> >     get data
> >      >>                                     sent to the downstream
> >     operators by
> >      >>                                     putting in a no-op map
> >     before the
> >      >>                                     window and placing some
> >     breakpoints
> >      >>                                     in there to manually slow
> down
> >      >>                                     processing.
> >      >>
> >      >>
> >      >>                                     The logs don't seem to
> indicate
> >      >>                                     anything went wrong and
> >     generally
> >      >>                                     look like:
> >      >>
> >      >>                                     4819 [Source: Custom File
> source
> >      >>                                     (1/1)] INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File
> >      >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
> >     RUNNING
> >      >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
> >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
> >     resources
> >      >> for Source: Custom File source (1/1)
> >     (3578629787c777320d9ab030c004abd4).
> >      >>                                     4819 [Source: Custom File
> source
> >      >>                                     (1/1)] INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
> >     FileSystem
> >      >> streams are closed for task Source: Custom File source (1/1)
> >      >> (3578629787c777320d9ab030c004abd4) [FINISHED]
> >      >>                                     4820
> >      >>
> >      >> [flink-akka.actor.default-dispatcher-5]
> >      >>                                     INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
> >     Un-registering
> >      >> task and sending final execution state FINISHED to JobManager
> >     for task
> >      >> Source: Custom File source (1/1)
> 3578629787c777320d9ab030c004abd4.
> >      >>                                     ...
> >      >>                                     4996
> >      >>
> >      >> [Window(TumblingProcessingTimeWindows(60000),
> >      >>                                     TimedCountTrigger,
> >      >>                                     ProcessWindowFunction$1)
> >     (1/1)] INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskmanager.Task  -
> >      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
> >      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
> >      >> switched from RUNNING to FINISHED.
> >      >>                                     4996
> >      >>
> >      >> [Window(TumblingProcessingTimeWindows(60000),
> >      >>                                     TimedCountTrigger,
> >      >>                                     ProcessWindowFunction$1)
> >     (1/1)] INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
> >     resources
> >      >> for Window(TumblingProcessingTimeWindows(60000),
> TimedCountTrigger,
> >      >> ProcessWindowFunction$1) (1/1)
> (907acf9bfa2f4a9bbd13997b8b34d91f).
> >      >>                                     4996
> >      >>
> >      >> [Window(TumblingProcessingTimeWindows(60000),
> >      >>                                     TimedCountTrigger,
> >      >>                                     ProcessWindowFunction$1)
> >     (1/1)] INFO
> >      >>
> >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
> >     FileSystem
> >      >> streams are closed for task
> >      >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger,
> >      >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f)
> >      >> [FINISHED]
> >      >>                                     ...
> >      >>                                     rest of the shutdown
> >      >>                                     ...
> >      >>                                     Program execution finished
> >      >>                                     Job with JobID
> >      >>
> >     889b161e432c0e69a8d760bbed205d5d has
> >      >>                                     finished.
> >      >>                                     Job Runtime: 783 ms
> >      >>
> >      >>
> >      >>                                     Is there something I'm
> >     missing in my
> >      >>                                     setup? Could it be my custom
> >     window
> >      >>                                     trigger? Bug? I'm stumped.
> >      >>
> >      >>
> >      >>                                     Thanks,
> >      >>                                     Austin
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >>
> >      >
> >
>
>

Reply via email to