Hey Timo, Hah, that's a fair point about using time. I guess I should update my statement to "as a user, I don't want to worry about *manually managing* time".
That's a nice suggestion with the KeyedProcessFunction and no windows, I'll give that a shot. If I don't want to emit any duplicates, I'd have to essentially buffer the "last seen duplicate" for each key in that process function until the MAX_WATERMARK is sent through though, right? I could emit early results if I assume the max number of possible duplicates, but for records with no duplicates, I'd have to wait until no more records are coming -- am I missing something? Thanks so much, Austin On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <twal...@apache.org> wrote: > Hi Austin, > > if you don't want to worry about time at all, you should probably not > use any windows because those are a time-based operation. > > A solution that would look a bit nicer could be to use a pure > KeyedProcessFunction and implement the deduplication logic without > reusing windows. In ProcessFunctions you can register an event-time > timer. The timer would be triggered by the MAX_WATERMARK when the > pipeline shuts down even without having a timestamp assigned in the > StreamRecord. Watermark will leave SQL also without a time attribute as > far as I know. > > Regards, > Timo > > > On 08.10.20 17:38, Austin Cawley-Edwards wrote: > > Hey Timo, > > > > Sorry for the delayed reply. I'm using the Blink planner and using > > non-time-based joins. I've got an example repo here that shows my query/ > > setup [1]. It's got the manual timestamp assignment commented out for > > now, but that does indeed solve the issue. > > > > I'd really like to not worry about time at all in this job hah -- I > > started just using processing time, but Till pointed out that processing > > time timers won't be fired when input ends, which is the case for this > > streaming job processing CSV files, so I should be using event time. > > With that suggestion, I switched to ingestion time, where I then > > discovered the issue converting from SQL to data stream. > > > > IMO, as a user manually assigning timestamps on conversion makes sense > > if you're using event time and already handling time attributes > > yourself, but for ingestion time you really don't want to think about > > time at all, which is why it might make sense to propigate the > > automatically assigned timestamps in that case. Though not sure how > > difficult that would be. Let me know what you think! > > > > > > Best + thanks again, > > Austin > > > > [1]: https://github.com/austince/flink-1.10-sql-windowing-error > > > > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Btw which planner are you using? > > > > Regards, > > Timo > > > > On 05.10.20 10:23, Timo Walther wrote: > > > Hi Austin, > > > > > > could you share some details of your SQL query with us? The > > reason why > > > I'm asking is because I guess that the rowtime field is not > inserted > > > into the `StreamRecord` of DataStream API. The rowtime field is > only > > > inserted if there is a single field in the output of the query > > that is a > > > valid "time attribute". > > > > > > Esp. after non-time-based joins and aggregations, time attributes > > loose > > > there properties and become regular timestamps. Because timestamp > > and > > > watermarks might have diverged. > > > > > > If you know what you're doing, you can also assign the timestamp > > > manually after `toRetractStream.assignTimestampAndWatermarks` and > > > reinsert the field into the stream record. But before you do > that, I > > > think it is better to share more information about the query with > us. > > > > > > I hope this helps. > > > > > > Regards, > > > Timo > > > > > > > > > > > > On 05.10.20 09:25, Till Rohrmann wrote: > > >> Hi Austin, > > >> > > >> thanks for offering to help. First I would suggest asking Timo > > whether > > >> this is an aspect which is still missing or whether we > > overlooked it. > > >> Based on that we can then take the next steps. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards > > >> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com> > > <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>> > > wrote: > > >> > > >> Hey Till, > > >> > > >> Thanks for the notes. Yeah, the docs don't mention anything > > specific > > >> to this case, not sure if it's an uncommon one. Assigning > > timestamps > > >> on conversion does solve the issue. I'm happy to take a stab > at > > >> implementing the feature if it is indeed missing and you all > > think > > >> it'd be worthwhile. I think it's definitely a confusing > > aspect of > > >> working w/ the Table & DataStream APIs together. > > >> > > >> Best, > > >> Austin > > >> > > >> On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann > > <trohrm...@apache.org <mailto:trohrm...@apache.org> > > >> <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>> > > wrote: > > >> > > >> Hi Austin, > > >> > > >> yes, it should also work for ingestion time. > > >> > > >> I am not entirely sure whether event time is preserved > when > > >> converting a Table into a retract stream. It should be > > possible > > >> and if it is not working, then I guess it is a missing > > feature. > > >> But I am sure that @Timo Walther > > >> <mailto:twal...@apache.org > > <mailto:twal...@apache.org>> knows more about it. In doubt, you > > >> could assign a new watermark generator when having > > obtained the > > >> retract stream. > > >> > > >> Here is also a link to some information about event time > and > > >> watermarks [1]. Unfortunately, it does not state > > anything about > > >> the direction Table => DataStream. > > >> > > >> [1] > > >> > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html > > > > >> > > >> > > >> Cheers, > > >> Till > > >> > > >> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards > > >> <austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com>>> wrote: > > >> > > >> Hey Till, > > >> > > >> Just a quick question on time characteristics -- > > this should > > >> work for IngestionTime as well, correct? Is there > > anything > > >> special I need to do to have the CsvTableSource/ > > >> toRetractStream call to carry through the assigned > > >> timestamps, or do I have to re-assign timestamps > > during the > > >> conversion? I'm currently getting the `Record has > > >> Long.MIN_VALUE timestamp (= no timestamp marker)` > error, > > >> though I'm seeing timestamps being assigned if I step > > >> through the AutomaticWatermarkContext. > > >> > > >> Thanks, > > >> Austin > > >> > > >> On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards > > >> <austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com>>> > > >> wrote: > > >> > > >> Perfect, thanks so much Till! > > >> > > >> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann > > >> <trohrm...@apache.org > > <mailto:trohrm...@apache.org> <mailto:trohrm...@apache.org > > <mailto:trohrm...@apache.org>>> > > >> wrote: > > >> > > >> Hi Austin, > > >> > > >> I believe that the problem is the processing > > time > > >> window. Unlike for event time where we send a > > >> MAX_WATERMARK at the end of the stream to > > trigger > > >> all remaining windows, this does not happen > for > > >> processing time windows. Hence, if your > > stream ends > > >> and you still have an open processing time > > window, > > >> then it will never get triggered. > > >> > > >> The problem should disappear if you use > > event time > > >> or if you process unbounded streams which > > never end. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Thu, Oct 1, 2020 at 12:01 AM Austin > > >> Cawley-Edwards <austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com> > > >> <mailto:austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com>>> wrote: > > >> > > >> Hey all, > > >> > > >> Thanks for your patience. I've got a > > small repo > > >> that reproduces the issue here: > > >> > > >> https://github.com/austince/flink-1.10-sql-windowing-error > > >> > > >> > > >> Not sure what I'm doing wrong but it > > feels silly. > > >> > > >> Thanks so much! > > >> Austin > > >> > > >> On Tue, Sep 29, 2020 at 3:48 PM Austin > > >> Cawley-Edwards <austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com> > > >> <mailto:austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com>>> wrote: > > >> > > >> Hey Till, > > >> > > >> Thanks for the reply -- I'll try to > > see if I > > >> can reproduce this in a small repo > > and share > > >> it with you. > > >> > > >> Best, > > >> Austin > > >> > > >> On Tue, Sep 29, 2020 at 3:58 AM Till > > >> Rohrmann <trohrm...@apache.org > > <mailto:trohrm...@apache.org> > > >> <mailto:trohrm...@apache.org > > <mailto:trohrm...@apache.org>>> wrote: > > >> > > >> Hi Austin, > > >> > > >> could you share with us the > > exact job > > >> you are running (including the > > custom > > >> window trigger)? This would help > > us to > > >> better understand your problem. > > >> > > >> I am also pulling in Klou and > > Timo who > > >> might help with the windowing > > logic and > > >> the Table to DataStream > conversion. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Mon, Sep 28, 2020 at 11:49 PM > > Austin > > >> Cawley-Edwards > > <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com> > > >> <mailto:austin.caw...@gmail.com > > <mailto:austin.caw...@gmail.com>>> wrote: > > >> > > >> Hey all, > > >> > > >> I'm not sure if I've missed > > >> something in the docs, but > I'm > > >> having a bit of trouble with > a > > >> streaming SQL job that > > starts w/ raw > > >> SQL queries and then > > transitions to > > >> a more traditional streaming > > job. > > >> I'm on Flink 1.10 using the > > Blink > > >> planner, running locally > with no > > >> checkpointing. > > >> > > >> The job looks roughly like: > > >> > > >> CSV 1 --> > > >> CSV 2 --> SQL Query to Join > --> > > >> toRetractStream --> keyed > time > > >> window w/ process func & > custom > > >> trigger --> some other ops > > >> CSV 3 --> > > >> > > >> > > >> When I remove the windowing > > directly > > >> after the `toRetractStream`, > the > > >> records make it to the "some > > other > > >> ops" stage, but with the > > windowing, > > >> those operations are > > sometimes not > > >> sent any data. I can also > > get data > > >> sent to the downstream > > operators by > > >> putting in a no-op map > > before the > > >> window and placing some > > breakpoints > > >> in there to manually slow > down > > >> processing. > > >> > > >> > > >> The logs don't seem to > indicate > > >> anything went wrong and > > generally > > >> look like: > > >> > > >> 4819 [Source: Custom File > source > > >> (1/1)] INFO > > >> > > >> org.apache.flink.runtime.taskmanager.Task - Source: Custom File > > >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from > > RUNNING > > >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO > > >> org.apache.flink.runtime.taskmanager.Task - Freeing task > > resources > > >> for Source: Custom File source (1/1) > > (3578629787c777320d9ab030c004abd4). > > >> 4819 [Source: Custom File > source > > >> (1/1)] INFO > > >> > > >> org.apache.flink.runtime.taskmanager.Task - Ensuring all > > FileSystem > > >> streams are closed for task Source: Custom File source (1/1) > > >> (3578629787c777320d9ab030c004abd4) [FINISHED] > > >> 4820 > > >> > > >> [flink-akka.actor.default-dispatcher-5] > > >> INFO > > >> > > >> org.apache.flink.runtime.taskexecutor.TaskExecutor - > > Un-registering > > >> task and sending final execution state FINISHED to JobManager > > for task > > >> Source: Custom File source (1/1) > 3578629787c777320d9ab030c004abd4. > > >> ... > > >> 4996 > > >> > > >> [Window(TumblingProcessingTimeWindows(60000), > > >> TimedCountTrigger, > > >> ProcessWindowFunction$1) > > (1/1)] INFO > > >> > > >> org.apache.flink.runtime.taskmanager.Task - > > >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, > > >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) > > >> switched from RUNNING to FINISHED. > > >> 4996 > > >> > > >> [Window(TumblingProcessingTimeWindows(60000), > > >> TimedCountTrigger, > > >> ProcessWindowFunction$1) > > (1/1)] INFO > > >> > > >> org.apache.flink.runtime.taskmanager.Task - Freeing task > > resources > > >> for Window(TumblingProcessingTimeWindows(60000), > TimedCountTrigger, > > >> ProcessWindowFunction$1) (1/1) > (907acf9bfa2f4a9bbd13997b8b34d91f). > > >> 4996 > > >> > > >> [Window(TumblingProcessingTimeWindows(60000), > > >> TimedCountTrigger, > > >> ProcessWindowFunction$1) > > (1/1)] INFO > > >> > > >> org.apache.flink.runtime.taskmanager.Task - Ensuring all > > FileSystem > > >> streams are closed for task > > >> Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, > > >> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) > > >> [FINISHED] > > >> ... > > >> rest of the shutdown > > >> ... > > >> Program execution finished > > >> Job with JobID > > >> > > 889b161e432c0e69a8d760bbed205d5d has > > >> finished. > > >> Job Runtime: 783 ms > > >> > > >> > > >> Is there something I'm > > missing in my > > >> setup? Could it be my custom > > window > > >> trigger? Bug? I'm stumped. > > >> > > >> > > >> Thanks, > > >> Austin > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > > > >