The output "interval" of the aforementioned query is still governed by the
last windowing operation. But when the query begins, you might need to wait
long as the first watermark from the second stream is going to take time to
be generated.

The interval join will cleanup state that it doesn't need anymore, looking
at its "internal clock", which value is always the minimum of the two last
watermarks received by the input streams. So once this clock moves at time
10, every cached event (from any of the two input streams) with event time
< 10 will be cleared from the operator state.

Also, please note that when we talk about time in this specific query we're
talking about event time and not the natural flow of the time in the "real
world" (which in Flink is called process time).


> Hi Francesco,
> Thank you so much for your reply. This was really helpful. In reply to
> your tips:
> *> As described here
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>,
> we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
> TVF instead
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs>*
> Yes, we are trying to move towards windowing TVFs as well. Some of our
> existing jobs still use Group Window Aggregation and hence we are still
> using it.
> *> You can directly use Window joins
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/>
>  as
> well for your query, as they're meant exactly to cover your use case*
> Thanks. Looks like it is used along with Windowing TVFs though. But I will
> try to explore this.
> *> Any particular reason you're creating the input tables from DataStream
> instead than creating them directly from Table API using either CREATE
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
>  or
> TableDescriptor?*
> We are creating a File Source which can read parquet files from a remote
> GCS(Google Cloud Storage) bucket. We had evaluated this
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/parquet/#:~:text=CREATE%20TABLE%20user_behavior%20(%0A%20%20user_id%20BIGINT%2C%0A%20%20item_id%20BIGINT%2C%0A%20%20category_id%20BIGINT%2C%0A%20%20behavior%20STRING%2C%0A%20%20ts%20TIMESTAMP(3)%2C%0A%20%20dt%20STRING%0A)%20PARTITIONED%20BY%20(dt)%20WITH%20(%0A%20%27connector%27%20%3D%20%27filesystem%27%2C%0A%20%27path%27%20%3D%20%27/tmp/user_behavior%27%2C%0A%20%27format%27%20%3D%20%27parquet%27%0A)>
>  to
> create a table but we faced the following challenges :
>    - We plan to use this parquet source to create a Hybrid Source later.
>    Hence, we had to use a File Source.
>    - A call to GCS returns files in lexicographic order. We wanted a high
>    level deterministic order in which files are picked for reading and hence
>    we resorted to using a File Source with a custom Split Assigner to assign
>    the files to the source readers in some order.
>    - Creating the table
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/parquet/#:~:text=CREATE%20TABLE%20user_behavior%20(%0A%20%20user_id%20BIGINT%2C%0A%20%20item_id%20BIGINT%2C%0A%20%20category_id%20BIGINT%2C%0A%20%20behavior%20STRING%2C%0A%20%20ts%20TIMESTAMP(3)%2C%0A%20%20dt%20STRING%0A)%20PARTITIONED%20BY%20(dt)%20WITH%20(%0A%20%27connector%27%20%3D%20%27filesystem%27%2C%0A%20%27path%27%20%3D%20%27/tmp/user_behavior%27%2C%0A%20%27format%27%20%3D%20%27parquet%27%0A)>
>    requires specifying the column names and data types. However, in our case
>    we use the Protobuf schema to read the schema for a parquet file. Also,
>    some values in the parquet file need some custom type conversion ( int64 ->
>    timestamp, for example).
> I had a question with regards to this point you mentioned :
> *> In other words, it won't drop the content of kafkaTable immediately,
> but after both streams are at "the same point in time" (defined by the
> watermarks of both streams).*
> Does it mean that the output of the join will be flushed to the sink at
> the period defined by the minimum watermark ? That is, 60 minutes in the
> above case ?
> Also, I read here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#:~:text=Since%20time%20attributes%20are%20quasi%2Dmonotonic%20increasing%2C%20Flink%20can%20remove%20old%20values%20from%20its%20state%20without%20affecting%20the%20correctness%20of%20the%20result.>
>  that
> Flink will remove old data from its state in case of interval joins. Does
> this mean that data present in both the tables will be removed after the
> minimum watermark delay ( 60 minutes in this case) ?
>> Hi,
>> So my understanding of your query is that you want to do a join first,
>> and then group by a 60 minutes distance and aggregate them. Please correct
>> me if I'm wrong.
>> First of all, the query you've posted is incorrect and should fail, as
>> its plan is invalid because it's using a regular join. Regular joins cannot
>> be concatenated with other "time operations" like a group by window, as
>> they don't produce any watermark.
>> My suggestion for your query is to use an interval join
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#interval-joins>
>> first, and then a group window. For example:
>> SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time,
>> MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time))
>> FROM (
>>   SELECT fileTable.id AS id, fileTable.event_time AS file_time,
>> kafkaTable.event_time AS kafka_time
>>   FROM fileTable, kafkaTable
>>   WHERE fileTable.id = kafkaTable.id AND fileTable.event_time BETWEEN
>> kafkaTable.event_time - INTERVAL '1' HOUR AND kafkaTable.event_time
>> )
>> GROUP BY id, TUMBLE(file_time, INTERVAL '60' MINUTE)
>> This produces the correct result, as the interval join will produce the
>> cartesian product of the events at a maximum distance of 1 hour between
>> them, and at runtime they'll emit the minimum watermark between the two
>> inputs. In other words, it won't drop the content of kafkaTable
>> immediately, but after both streams are at "the same point in time"
>> (defined by the watermarks of both streams).
>> After the cartesian product is emitted from the interval join, the group
>> by will be executed.
>> I also have some tips:
>> * As described here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>,
>> we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
>> TVF instead
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs>
>> * You can directly use Window joins
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/>
>> as well for your query, as they're meant exactly to cover your use case
>> * Any particular reason you're creating the input tables from DataStream
>> instead than creating them directly from Table API using either CREATE
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
>> or TableDescriptor?
>>> Hello,
>>> We are creating two data streams in our Flink application. Both of them
>>> are then formed into two Tables. The first data stream has a watermark
>>> delay of 24 hours while the second stream has a watermark delay of 60
>>> minutes. The watermark used is of BoundedOutOfOrderness strategy and uses a
>>> particular event_time field present within the the records themselves to
>>> assign watermarks.
>>> For example,
>>> DataStream<Row> fileStream = env.fromSource(
>>>                     fileSource,
>>>                     getWatermarkStrategy(86400000), // custom function,
>>> watermark of 24 hours in ms
>>>                     "fileSource");
>>> Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
>>> tableEnv.createTemporaryView("fileTable", firstTable);
>>> DataStream<Row> kafkaStream = env.fromSource(
>>>                     kafkaSource,
>>>                     getWatermarkStrategy(3600000), // custom function, 
>>> watermark
>>> of 60 minutes in ms
>>>                     "kafkaSource");
>>> Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
>>> tableEnv.createTemporaryView("kafkaTable", secondTable);
>>> Now we want to write a continuous SQL query to join
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries>
>>>  firstTable and secondTable with a TumbleWindow of 60 minutes
>>> "SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
>>> event_time,
>>> MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
>>>                     "FROM fileTable, kafkaTable " +
>>>                     "where fileTable.id = kafkaTable.id " +
>>>                     "group by TUMBLE(fileTable.rowtime, INTERVAL '60'
>>> MINUTE)"
>>> What we want to know is, will a join or aggregation queries work
>>> correctly between the two tables.  Is it the case that the contents of
>>> kafkaTable will be purged immediately after 60 minutes and hence a
>>> join/aggregation might not give correct results ?
>>> Will there be a data loss if tables with different watermark delays are
>>> joined ?
