Hi,

So my understanding of your query is that you want to do a join first, and
then group by a 60 minutes distance and aggregate them. Please correct me
if I'm wrong.

First of all, the query you've posted is incorrect and should fail, as its
plan is invalid because it's using a regular join. Regular joins cannot be
concatenated with other "time operations" like a group by window, as they
don't produce any watermark.

My suggestion for your query is to use an interval join
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#interval-joins>
first, and then a group window. For example:

SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time,
MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time))
FROM (
  SELECT fileTable.id AS id, fileTable.event_time AS file_time,
kafkaTable.event_time AS kafka_time
  FROM fileTable, kafkaTable
  WHERE fileTable.id = kafkaTable.id AND fileTable.event_time BETWEEN
kafkaTable.event_time - INTERVAL '1' HOUR AND kafkaTable.event_time
)
GROUP BY id, TUMBLE(file_time, INTERVAL '60' MINUTE)

This produces the correct result, as the interval join will produce the
cartesian product of the events at a maximum distance of 1 hour between
them, and at runtime they'll emit the minimum watermark between the two
inputs. In other words, it won't drop the content of kafkaTable
immediately, but after both streams are at "the same point in time"
(defined by the watermarks of both streams).
After the cartesian product is emitted from the interval join, the group by
will be executed.

I also have some tips:

* As described here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>,
we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
TVF instead
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs>
* You can directly use Window joins
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/>
as well for your query, as they're meant exactly to cover your use case
* Any particular reason you're creating the input tables from DataStream
instead than creating them directly from Table API using either CREATE TABLE
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
or TableDescriptor?

Hope it helps,
FG


On Mon, Feb 14, 2022 at 8:39 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We are creating two data streams in our Flink application. Both of them
> are then formed into two Tables. The first data stream has a watermark
> delay of 24 hours while the second stream has a watermark delay of 60
> minutes. The watermark used is of BoundedOutOfOrderness strategy and uses a
> particular event_time field present within the the records themselves to
> assign watermarks.
>
> For example,
>
> DataStream<Row> fileStream = env.fromSource(
>                     fileSource,
>                     getWatermarkStrategy(86400000), // custom function,
> watermark of 24 hours in ms
>                     "fileSource");
> Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
> tableEnv.createTemporaryView("fileTable", firstTable);
>
> DataStream<Row> kafkaStream = env.fromSource(
>                     kafkaSource,
>                     getWatermarkStrategy(3600000), // custom function, 
> watermark
> of 60 minutes in ms
>                     "kafkaSource");
> Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
> tableEnv.createTemporaryView("kafkaTable", secondTable);
>
> Now we want to write a continuous SQL query to join
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries>
>  firstTable and secondTable with a TumbleWindow of 60 minutes
>
> "SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
> event_time,
> MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
>                     "FROM fileTable, kafkaTable " +
>                     "where fileTable.id = kafkaTable.id " +
>                     "group by TUMBLE(fileTable.rowtime, INTERVAL '60'
> MINUTE)"
>
> What we want to know is, will a join or aggregation queries work correctly
> between the two tables.  Is it the case that the contents of  kafkaTable
> will be purged immediately after 60 minutes and hence a join/aggregation
> might not give correct results ?
> Will there be a data loss if tables with different watermark delays are
> joined ?
>
> --
> *Regards,*
> *Meghajit*
>

Reply via email to