Hi Austin,

could you share some details of your SQL query with us? The reason why I'm asking is because I guess that the rowtime field is not inserted into the `StreamRecord` of DataStream API. The rowtime field is only inserted if there is a single field in the output of the query that is a valid "time attribute".

Esp. after non-time-based joins and aggregations, time attributes loose there properties and become regular timestamps. Because timestamp and watermarks might have diverged.

If you know what you're doing, you can also assign the timestamp manually after `toRetractStream.assignTimestampAndWatermarks` and reinsert the field into the stream record. But before you do that, I think it is better to share more information about the query with us.

I hope this helps.

Regards,
Timo



On 05.10.20 09:25, Till Rohrmann wrote:
Hi Austin,

thanks for offering to help. First I would suggest asking Timo whether this is an aspect which is still missing or whether we overlooked it. Based on that we can then take the next steps.

Cheers,
Till

On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>> wrote:

    Hey Till,

    Thanks for the notes. Yeah, the docs don't mention anything specific
    to this case, not sure if it's an uncommon one. Assigning timestamps
    on conversion does solve the issue. I'm happy to take a stab at
    implementing the feature if it is indeed missing and you all think
    it'd be worthwhile. I think it's definitely a confusing aspect of
    working w/ the Table & DataStream APIs together.

    Best,
    Austin

    On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org
    <mailto:trohrm...@apache.org>> wrote:

        Hi Austin,

        yes, it should also work for ingestion time.

        I am not entirely sure whether event time is preserved when
        converting a Table into a retract stream. It should be possible
        and if it is not working, then I guess it is a missing feature.
        But I am sure that @Timo Walther
        <mailto:twal...@apache.org> knows more about it. In doubt, you
        could assign a new watermark generator when having obtained the
        retract stream.

        Here is also a link to some information about event time and
        watermarks [1]. Unfortunately, it does not state anything about
        the direction Table => DataStream.

        [1]
        
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html

        Cheers,
        Till

        On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
        <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>> wrote:

            Hey Till,

            Just a quick question on time characteristics -- this should
            work for IngestionTime as well, correct? Is there anything
            special I need to do to have the CsvTableSource/
            toRetractStream call to carry through the assigned
            timestamps, or do I have to re-assign timestamps during the
            conversion? I'm currently getting the `Record has
            Long.MIN_VALUE timestamp (= no timestamp marker)` error,
            though I'm seeing timestamps being assigned if I step
            through the AutomaticWatermarkContext.

            Thanks,
            Austin

            On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards
            <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
            wrote:

                Perfect, thanks so much Till!

                On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
                <trohrm...@apache.org <mailto:trohrm...@apache.org>> wrote:

                    Hi Austin,

                    I believe that the problem is the processing time
                    window. Unlike for event time where we send a
                    MAX_WATERMARK at the end of the stream to trigger
                    all remaining windows, this does not happen for
                    processing time windows. Hence, if your stream ends
                    and you still have an open processing time window,
                    then it will never get triggered.

                    The problem should disappear if you use event time
                    or if you process unbounded streams which never end.

                    Cheers,
                    Till

                    On Thu, Oct 1, 2020 at 12:01 AM Austin
                    Cawley-Edwards <austin.caw...@gmail.com
                    <mailto:austin.caw...@gmail.com>> wrote:

                        Hey all,

                        Thanks for your patience. I've got a small repo
                        that reproduces the issue here:
                        
https://github.com/austince/flink-1.10-sql-windowing-error


                        Not sure what I'm doing wrong but it feels silly.

                        Thanks so much!
                        Austin

                        On Tue, Sep 29, 2020 at 3:48 PM Austin
                        Cawley-Edwards <austin.caw...@gmail.com
                        <mailto:austin.caw...@gmail.com>> wrote:

                            Hey Till,

                            Thanks for the reply -- I'll try to see if I
                            can reproduce this in a small repo and share
                            it with you.

                            Best,
                            Austin

                            On Tue, Sep 29, 2020 at 3:58 AM Till
                            Rohrmann <trohrm...@apache.org
                            <mailto:trohrm...@apache.org>> wrote:

                                Hi Austin,

                                could you share with us the exact job
                                you are running (including the custom
                                window trigger)? This would help us to
                                better understand your problem.

                                I am also pulling in Klou and Timo who
                                might help with the windowing logic and
                                the Table to DataStream conversion.

                                Cheers,
                                Till

                                On Mon, Sep 28, 2020 at 11:49 PM Austin
                                Cawley-Edwards <austin.caw...@gmail.com
                                <mailto:austin.caw...@gmail.com>> wrote:

                                    Hey all,

                                    I'm not sure if I've missed
                                    something in the docs, but I'm
                                    having a bit of trouble with a
                                    streaming SQL job that starts w/ raw
                                    SQL queries and then transitions to
                                    a more traditional streaming job.
                                    I'm on Flink 1.10 using the Blink
                                    planner, running locally with no
                                    checkpointing.

                                    The job looks roughly like:

                                    CSV 1 -->
                                    CSV 2 -->  SQL Query to Join -->
                                    toRetractStream --> keyed time
                                    window w/ process func & custom
                                    trigger --> some other ops
                                    CSV 3 -->


                                    When I remove the windowing directly
                                    after the `toRetractStream`, the
                                    records make it to the "some other
                                    ops" stage, but with the windowing,
                                    those operations are sometimes not
                                    sent any data. I can also get data
                                    sent to the downstream operators by
                                    putting in a no-op map before the
                                    window and placing some breakpoints
                                    in there to manually slow down
                                    processing.


                                    The logs don't seem to indicate
                                    anything went wrong and generally
                                    look like:

                                    4819 [Source: Custom File source
                                    (1/1)] INFO
                                      org.apache.flink.runtime.taskmanager.Task 
 - Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4) switched 
from RUNNING to FINISHED.\4819 [Source: Custom File source (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: 
Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
                                    4819 [Source: Custom File source
                                    (1/1)] INFO
                                      org.apache.flink.runtime.taskmanager.Task 
 - Ensuring all FileSystem streams are closed for task Source: Custom File 
source (1/1) (3578629787c777320d9ab030c004abd4) [FINISHED]
                                    4820
                                    [flink-akka.actor.default-dispatcher-5]
                                    INFO
                                      
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and 
sending final execution state FINISHED to JobManager for task Source: Custom 
File source (1/1) 3578629787c777320d9ab030c004abd4.
                                    ...
                                    4996
                                    
[Window(TumblingProcessingTimeWindows(60000),
                                    TimedCountTrigger,
                                    ProcessWindowFunction$1) (1/1)] INFO
                                      org.apache.flink.runtime.taskmanager.Task 
 - Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, 
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched from 
RUNNING to FINISHED.
                                    4996
                                    
[Window(TumblingProcessingTimeWindows(60000),
                                    TimedCountTrigger,
                                    ProcessWindowFunction$1) (1/1)] INFO
                                      org.apache.flink.runtime.taskmanager.Task 
 - Freeing task resources for Window(TumblingProcessingTimeWindows(60000), 
TimedCountTrigger, ProcessWindowFunction$1) (1/1) 
(907acf9bfa2f4a9bbd13997b8b34d91f).
                                    4996
                                    
[Window(TumblingProcessingTimeWindows(60000),
                                    TimedCountTrigger,
                                    ProcessWindowFunction$1) (1/1)] INFO
                                      org.apache.flink.runtime.taskmanager.Task 
 - Ensuring all FileSystem streams are closed for task 
Window(TumblingProcessingTimeWindows(60000), TimedCountTrigger, 
ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
                                    ...
                                    rest of the shutdown
                                    ...
                                    Program execution finished
                                    Job with JobID
                                    889b161e432c0e69a8d760bbed205d5d has
                                    finished.
                                    Job Runtime: 783 ms


                                    Is there something I'm missing in my
                                    setup? Could it be my custom window
                                    trigger? Bug? I'm stumped.


                                    Thanks,
                                    Austin













Reply via email to