Hi Francesco, Thank you so much for your reply. This was really helpful. In reply to your tips:
*> As described here <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>, we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing TVF instead <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs>* Yes, we are trying to move towards windowing TVFs as well. Some of our existing jobs still use Group Window Aggregation and hence we are still using it. *> You can directly use Window joins <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/> as well for your query, as they're meant exactly to cover your use case* Thanks. Looks like it is used along with Windowing TVFs though. But I will try to explore this. *> Any particular reason you're creating the input tables from DataStream instead than creating them directly from Table API using either CREATE TABLE <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table> or TableDescriptor?* We are creating a File Source which can read parquet files from a remote GCS(Google Cloud Storage) bucket. We had evaluated this <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/parquet/#:~:text=CREATE%20TABLE%20user_behavior%20(%0A%20%20user_id%20BIGINT%2C%0A%20%20item_id%20BIGINT%2C%0A%20%20category_id%20BIGINT%2C%0A%20%20behavior%20STRING%2C%0A%20%20ts%20TIMESTAMP(3)%2C%0A%20%20dt%20STRING%0A)%20PARTITIONED%20BY%20(dt)%20WITH%20(%0A%20%27connector%27%20%3D%20%27filesystem%27%2C%0A%20%27path%27%20%3D%20%27/tmp/user_behavior%27%2C%0A%20%27format%27%20%3D%20%27parquet%27%0A)> to create a table but we faced the following challenges : - We plan to use this parquet source to create a Hybrid Source later. Hence, we had to use a File Source. - A call to GCS returns files in lexicographic order. We wanted a high level deterministic order in which files are picked for reading and hence we resorted to using a File Source with a custom Split Assigner to assign the files to the source readers in some order. - Creating the table <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/parquet/#:~:text=CREATE%20TABLE%20user_behavior%20(%0A%20%20user_id%20BIGINT%2C%0A%20%20item_id%20BIGINT%2C%0A%20%20category_id%20BIGINT%2C%0A%20%20behavior%20STRING%2C%0A%20%20ts%20TIMESTAMP(3)%2C%0A%20%20dt%20STRING%0A)%20PARTITIONED%20BY%20(dt)%20WITH%20(%0A%20%27connector%27%20%3D%20%27filesystem%27%2C%0A%20%27path%27%20%3D%20%27/tmp/user_behavior%27%2C%0A%20%27format%27%20%3D%20%27parquet%27%0A)> requires specifying the column names and data types. However, in our case we use the Protobuf schema to read the schema for a parquet file. Also, some values in the parquet file need some custom type conversion ( int64 -> timestamp, for example). I had a question with regards to this point you mentioned : *> In other words, it won't drop the content of kafkaTable immediately, but after both streams are at "the same point in time" (defined by the watermarks of both streams).* Does it mean that the output of the join will be flushed to the sink at the period defined by the minimum watermark ? That is, 60 minutes in the above case ? Also, I read here <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#:~:text=Since%20time%20attributes%20are%20quasi%2Dmonotonic%20increasing%2C%20Flink%20can%20remove%20old%20values%20from%20its%20state%20without%20affecting%20the%20correctness%20of%20the%20result.> that Flink will remove old data from its state in case of interval joins. Does this mean that data present in both the tables will be removed after the minimum watermark delay ( 60 minutes in this case) ? Regards, Meghajit On Mon, Feb 14, 2022 at 8:13 PM Francesco Guardiani <france...@ververica.com> wrote: > Hi, > > So my understanding of your query is that you want to do a join first, and > then group by a 60 minutes distance and aggregate them. Please correct me > if I'm wrong. > > First of all, the query you've posted is incorrect and should fail, as its > plan is invalid because it's using a regular join. Regular joins cannot be > concatenated with other "time operations" like a group by window, as they > don't produce any watermark. > > My suggestion for your query is to use an interval join > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#interval-joins> > first, and then a group window. For example: > > SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time, > MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time)) > FROM ( > SELECT fileTable.id AS id, fileTable.event_time AS file_time, > kafkaTable.event_time AS kafka_time > FROM fileTable, kafkaTable > WHERE fileTable.id = kafkaTable.id AND fileTable.event_time BETWEEN > kafkaTable.event_time - INTERVAL '1' HOUR AND kafkaTable.event_time > ) > GROUP BY id, TUMBLE(file_time, INTERVAL '60' MINUTE) > > This produces the correct result, as the interval join will produce the > cartesian product of the events at a maximum distance of 1 hour between > them, and at runtime they'll emit the minimum watermark between the two > inputs. In other words, it won't drop the content of kafkaTable > immediately, but after both streams are at "the same point in time" > (defined by the watermarks of both streams). > After the cartesian product is emitted from the interval join, the group > by will be executed. > > I also have some tips: > > * As described here > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>, > we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing > TVF instead > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs> > * You can directly use Window joins > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/> > as well for your query, as they're meant exactly to cover your use case > * Any particular reason you're creating the input tables from DataStream > instead than creating them directly from Table API using either CREATE > TABLE > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table> > or TableDescriptor? > > Hope it helps, > FG > > > On Mon, Feb 14, 2022 at 8:39 AM Meghajit Mazumdar < > meghajit.mazum...@gojek.com> wrote: > >> Hello, >> >> We are creating two data streams in our Flink application. Both of them >> are then formed into two Tables. The first data stream has a watermark >> delay of 24 hours while the second stream has a watermark delay of 60 >> minutes. The watermark used is of BoundedOutOfOrderness strategy and uses a >> particular event_time field present within the the records themselves to >> assign watermarks. >> >> For example, >> >> DataStream<Row> fileStream = env.fromSource( >> fileSource, >> getWatermarkStrategy(86400000), // custom function, >> watermark of 24 hours in ms >> "fileSource"); >> Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions); >> tableEnv.createTemporaryView("fileTable", firstTable); >> >> DataStream<Row> kafkaStream = env.fromSource( >> kafkaSource, >> getWatermarkStrategy(3600000), // custom function, >> watermark >> of 60 minutes in ms >> "kafkaSource"); >> Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions); >> tableEnv.createTemporaryView("kafkaTable", secondTable); >> >> Now we want to write a continuous SQL query to join >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/dynamic_tables/#continuous-queries> >> firstTable and secondTable with a TumbleWindow of 60 minutes >> >> "SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS >> event_time, >> MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," + >> "FROM fileTable, kafkaTable " + >> "where fileTable.id = kafkaTable.id " + >> "group by TUMBLE(fileTable.rowtime, INTERVAL '60' >> MINUTE)" >> >> What we want to know is, will a join or aggregation queries work >> correctly between the two tables. Is it the case that the contents of >> kafkaTable will be purged immediately after 60 minutes and hence a >> join/aggregation might not give correct results ? >> Will there be a data loss if tables with different watermark delays are >> joined ? >> >> -- >> *Regards,* >> *Meghajit* >> > -- *Regards,* *Meghajit*