V1ncentzzZ commented on a change in pull request #14373: URL: https://github.com/apache/flink/pull/14373#discussion_r542135018
########## File path: docs/dev/table/streaming/dynamic_tables.md ########## @@ -28,161 +28,285 @@ This page describes how relational concepts elegantly translate to streaming, al * This will be replaced by the TOC {:toc} -Relational Queries on Data Streams ----------------------------------- +Relational Queries on Unbounded Data +------------------------------------ -The following table compares traditional relational algebra and stream processing for input data, execution, and output results. +First, let's try to understand the challenges processing unbounded data presents as compared to bounded data. +In Flink, unbounded data is represented as data stream while bounded data is represented as batch table. <table class="table table-bordered"> - <tr> - <th>Relational Algebra / SQL</th> - <th>Stream Processing</th> - </tr> - <tr> - <td>Relations (or tables) are bounded (multi-)sets of tuples.</td> - <td>A stream is an infinite sequences of tuples.</td> - </tr> - <tr> - <td>A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.</td> - <td>A streaming query cannot access all data when it is started and has to "wait" for data to be streamed in.</td> - </tr> - <tr> - <td>A batch query terminates after it produced a fixed sized result.</td> - <td>A streaming query continuously updates its result based on the received records and never completes.</td> - </tr> + <tr> + <th>Relational Algebra / SQL</th> + <th>Stream Processing</th> + </tr> + <tr> + <td>Relations (or tables) are bounded (multi-)sets of tuples.</td> + <td>A stream is an infinite sequences of tuples.</td> + </tr> + <tr> + <td>A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.</td> + <td>A streaming query cannot access all data when it is started and has to "wait" for data to be streamed in.</td> + </tr> + <tr> + <td>A batch query terminates after it produced a fixed sized result.</td> + <td>A streaming query continuously updates its result based on the received records and never completes.</td> + </tr> </table> -Despite these differences, relational queries and SQL provide a powerful toolset for processing streams. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the query result such that the query does not need to be evaluated when it is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes obsolete when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update a materialized view as soon as its base tables are updated. +Despite these challenges, processing streams with relational queries and SQL is not impossible. We take inspirationg from the *Materialized Views* feature of the relational database systems. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. However, a common challenge for Materialized Views is to prevent the cache from serving outdated results when the base tables of its definition query are modified. Databases use *Eager View Maintenance* technique to update a materialized view as soon as its base tables are updated. -The connection between eager view maintenance and SQL queries on streams becomes evident if we consider the following: +We can draw the following analogy between Eager view maintenance and Streaming SQL queries: -- A database table results from a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*. -- A materialized view is defined as a SQL query. To update the view, queries must continuously process the changelog streams of the view's base relations. -- The materialized view is the result of the streaming SQL query. +- A table in most of the databases as well as Flink is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*. +- The result of Streaming SQL query is equivalent to a Materialized View. +- The SQL query continuously processes the changelog streams of the base tables to update the result stream. -We introduce the following concept of *Dynamic tables* in the next section with these points in mind. +Flink Table API defines these materialized view as *Dynamic tables*. Let's take a look at them in the next section. Dynamic Tables & Continuous Queries --------------------------------------- -*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a *Continuous Query*. A continuous query never terminates and produces dynamic results - another dynamic table. The query continuously updates its (dynamic) result table to reflect changes on its (dynamic) input tables. Essentially, a continuous query on a dynamic table is very similar to a query that defines a materialized view. +*Dynamic tables* are the core concept of Flink's Table API and SQL to support queries on streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. However, they can be queried like static batch tables. A query on Dynamic Tables never terminates and continuously updates its result to reflect the changes on its input tables. These queries are known as *Continuous Queries*. If Dynamic Table is equivalent to a Materialized View then continuous queries are equivalent to the SQL query defined on the base table. -It is important to note that a continuous query output is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables. +It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables. -The following figure visualizes the relationship of streams, dynamic tables, and continuous queries: +The following figure clearly visualizes the relationship bwteeen streams, dynamic tables, and continuous queries: <center> -<img alt="Dynamic tables" src="{% link /fig/table-streaming/stream-query-stream.png %}" width="80%"> +<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/stream-query-stream.png" width="80%"> </center> -1. A stream is converted into a dynamic table. -1. A continuous query is evaluated on the dynamic table yielding a new dynamic table. -1. The resulting dynamic table is converted back into a stream. - -**Note:** Dynamic tables are foremost a logical concept. Dynamic tables are not necessarily (fully) materialized during query execution. -In the following, we will explain the concepts of dynamic tables and continuous queries with a stream of click events that have the following schema: +1. A stream is converted into a dynamic table. +2. A continuous query is evaluated on the dynamic table yielding a new dynamic table. +3. The resulting dynamic table is converted back into a stream. -{% highlight sql %} -CREATE TABLE clicks ( - user VARCHAR, // the name of the user - url VARCHAR, // the URL that was accessed by the user - cTime TIMESTAMP(3) // the time when the URL was accessed -) WITH (...); -{% endhighlight %} +**Note:** Dynamic tables are foremost a logical concept. Dynamic tables may not necessarily (fully) materialize during query execution. -Defining a Table on a Stream ----------------------------- +### Define a Dynamic Table -Processing streams with a relational query require converting it into a `Table`. Conceptually, each record of the stream is interpreted as an `INSERT` modification on the resulting table. We are building a table from an `INSERT`-only changelog stream. +In order to process a stream with a relational query, it has to be converted into a `Table`. Conceptually, each record of the stream is interpreted as an `INSERT` modification on the resulting table. Essentially, we are building a table from an `INSERT`-only changelog stream. The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted. <center> -<img alt="Append mode" src="{% link /fig/table-streaming/append-mode.png %}" width="60%"> +<img alt="Append mode" src="{{ site.baseurl }}/fig/table-streaming/append-mode.png" width="60%"> </center> -**Note:** A table defined on a stream is internally not materialized. -### Continuous Queries ----------------------- +All the examples in the next few sections use the `clicks` table that has the following schema: + +{% highlight plain %} +[ + user: VARCHAR, // the name of the user + cTime: TIMESTAMP, // the time when the URL was accessed + url: VARCHAR // the URL that was accessed by the user +] +{% endhighlight %} + +**Note:** A table which is defined on a stream is not materialized internally. -A continuous query is evaluated on a dynamic table and produces a new dynamic table as a result. In contrast to a batch query, a continuous query never terminates and updates its result table according to its input tables' updates. At any point in time, a continuous query is semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables. +## Continuous Queries -In the following, we show two example queries on a `clicks` table defined on the stream of click events. +As mentioned in the introduction, Continuous query is a query that never terminates and keeps updating its result table according to the changes on its input tables. + +In the following we show two example queries on `clicks` table. The first query is a simple `GROUP-BY COUNT` aggregation query. It groups the `clicks` table on the `user` field and counts the number of visited URLs. The following figure shows how the query is evaluated over time as the `clicks` table is updated with additional rows. <center> -<img alt="Continuous Non-Windowed Query" src="{% link /fig/table-streaming/query-groupBy-cnt.png %}" width="90%"> +<img alt="Continuous Non-Windowed Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-cnt.png" width="90%"> </center> -When the query starts, the `clicks` table (left-hand side) is empty. The query computes the result table when the first row is inserted. After the first row `[Mary, ./home]` arrives, the result table (right-hand side, top) consists of a single row `[Mary, 1]`. When the second row `[Bob, ./cart]` is inserted into the `clicks` table, the query updates the result table and inserts a new row `[Bob, 1]`. The third row, `[Mary, ./prod?id=1]` yields an update of an already computed result row such that `[Mary, 1]` is updated to `[Mary, 2]`. Finally, the query inserts a third row `[Liz, 1]` into the result table, when the fourth row is appended to the `clicks` table. +When the query is started, the `clicks` table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the `clicks` table. The following updates occur in the result table after each new row in input table: + +1. When the first row `[Mary, ./home]` is inserted, a new row `[Mary, 1]` is added to the result table. +2. On second row `[Bob, ./cart]`, the query inserts a new row `[Bob, 1]` in result. +3. The third row `[Mary, ./prod?id=1]` results in the update of the first row to `[Mary, 2]`. +4. The forth row `[Liz, ./home]` in the input adds a new row `[Liz, 1]` to the result. -The second query is similar to the first one but groups the `clicks` table in addition to the `user` attribute also on an [hourly tumbling window]({% link dev/table/sql/queries.md %}#group-windows) before it counts the number of URLs (time-based computations such as windows are based on special [time attributes](time_attributes.html) are discussed later). Again, the figure shows the input and output at different points in time to visualize the changing nature of dynamic tables. +The second query is similar to the first one but groups the `clicks` table in addition to the `user` attribute also on an [hourly tumbling window]({{ site.baseurl }}/dev/table/sql/queries.html#group-windows) before it counts the number of URLs (time-based computations such as windows are based on special [time attributes](time_attributes.html), which are discussed later.). This implies the updates for a particular hour are processed together while the updates for the next hour are treated as an entirely new set of input +Again, the figure shows the input and output at different points in time to visualize the changing nature of dynamic tables. <center> -<img alt="Continuous Group-Window Query" src="{% link /fig/table-streaming/query-groupBy-window-cnt.png %}" width="100%"> +<img alt="Continuous Group-Window Query" src="{{ site.baseurl }}/fig/table-streaming/query-groupBy-window-cnt.png" width="100%"> </center> -As before, the input table `clicks` is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (`cTime`) between `12:00:00` and `12:59:59`. The query computes two results rows from this input (one for each `user`) and appends them to the result table. For the next window between `13:00:00` and `13:59:59`, the `clicks` table contains three rows, which results in another two rows being appended to the result table. The result table is updated as more rows are appended to `clicks` over time. +As before, the input table `clicks` is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (`cTime`) between the first hour window `12:00:00` and `12:59:59`. The query computes two results rows in this window (one for each `user`) and appends them to the result table. For the next window between `13:00:00` and `13:59:59`, the `clicks` table contains three rows, which results in another two new rows in the result table. The result table is updated, as more rows are appended to `clicks` over time. The important point is once the window has passed in time, the data for that window is never updated. -### Update and Append Queries - -Although the two example queries appear to be quite similar (both compute a grouped count aggregate), they differ in one crucial aspect: +Although the two example queries appear to be quite similar (both compute a grouped count aggregate), they differ in one important aspect: - The first query updates previously emitted results, i.e., the changelog stream that defines the result table contains `INSERT` and `UPDATE` changes. -- The second query only appends to the result table, i.e., the result table's changelog stream only consists of `INSERT` changes. +- The second query only appends to the result table, i.e., the changelog stream of the result table only consists of `INSERT` changes. Whether a query produces an append-only table or an updated table has some implications: -- Queries that make update changes usually have to maintain more state (see the following section). +- Queries that produce update changes usually have to maintain more state (see the following section). - The conversion of an append-only table into a stream is different from the conversion of an updated table (see the [Table to Stream Conversion](#table-to-stream-conversion) section). -### Query Restrictions - -Many, but not all, semantically valid queries can be evaluated as continuous queries on streams. Some queries are too expensive to compute, either due to the size of state they need to maintain or because computing updates is too expensive. - -- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows to update them. For instance, the first example query needs to store the URL count for each user to increase the count and send out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail. - -{% highlight sql %} -SELECT user, COUNT(url) -FROM clicks -GROUP BY user; -{% endhighlight %} - -- **Computing Updates:** Some queries require to recompute and update a large fraction of the emitted result rows even if only a single input record is added or updated. Such queries are not well suited to be executed as continuous queries. An example is the following query that computes a `RANK` for each user based on the time of the last click. As soon as the `clicks` table receives a new row, the user's `lastAction` is updated and a new rank computed. However, since two rows cannot have the same rank, all lower ranked rows also need to be updated. - -{% highlight sql %} -SELECT user, RANK() OVER (ORDER BY lastAction) -FROM ( - SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user -); -{% endhighlight %} - -The [Query Configuration](query_configuration.html) page discusses parameters to control the execution of continuous queries. Some parameters can be used to trade the size of the maintained state for result accuracy. - Table to Stream Conversion -------------------------- A dynamic table can be continuously modified by `INSERT`, `UPDATE`, and `DELETE` changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without `UPDATE` and `DELETE` modifications, or anything in between. When converting a dynamic table into a stream or writing it to an external system, these changes need to be encoded. Flink's Table API and SQL support three ways to encode the changes of a dynamic table: -* **Append-only stream:** A dynamic table that is only modified by `INSERT` changes can be converted into a stream by emitting the inserted rows. +* **Append-only stream:** A dynamic table that is only modified by `INSERT` changes can be converted into a stream just by emitting the inserted rows. -* **Retract stream:** A retract stream is a stream with two types of messages, *add messages* and *retract messages*. A dynamic table is converted into a retract stream by encoding an `INSERT` change as add message, a `DELETE` change as a retract message, and an `UPDATE` change as a retract message for the updated (previous) row, and an additional message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream. +* **Retract stream:** A retract stream is a stream with two types of messages: + + * **Add messages** - These messages represent a new row being added to the table. `INSERT` statements are encoded as Add messages. + * **Retract messages**. The messages represents a row being deleted from the table. `DELETE` statement are encoded as Delete messages. + +`UPDATE` statements are encoded as a Retract message for the updated (previous) row and an Add message for the updating (new) row. + + The following figure visualizes the conversion of a dynamic table into a retract stream. <center> -<img alt="Dynamic tables" src="{% link /fig/table-streaming/undo-redo-mode.png %}" width="85%"> +<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/undo-redo-mode.png" width="85%"> </center> <br><br> -* **Upsert stream:** An upsert stream is a stream with two types of messages, *upsert messages* and *delete messages*. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with a unique key is transformed into a stream by encoding `INSERT` and `UPDATE` changes as upsert messages and `DELETE` changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute to apply messages correctly. The main difference to a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream. +* **Upsert stream:** An upsert stream is a stream with two types of messages - + + * **Upsert messages** - These messsages encode `INSERT` and `UPDATE` changes. Encoding an `UPDATE` message is not possible unless the table has a (possibly composite) unique key. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. + + * **Delete messages** - `DELETE` changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. + + The main difference between and upstert stream and a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream. <center> -<img alt="Dynamic tables" src="{% link /fig/table-streaming/redo-mode.png %}" width="85%"> +<img alt="Dynamic tables" src="{{ site.baseurl }}/fig/table-streaming/redo-mode.png" width="85%"> </center> <br><br> -The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({% link dev/table/common.md %}#convert-a-table-into-a-datastream) page. Please note that only append and retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page. +The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({{ site.baseurl }}/dev/table/common.html#convert-a-table-into-a-datastream) page. Please note that only Append and Retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page. + +Challenges in Unbounded Data Processing +---------------------------------------- + +Most of the semantically valid queries can be evaluated as continuous queries on streams. However, there are some queries that are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive. + +- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail. + + {% highlight sql %} + SELECT user, COUNT(url) Review comment: The `user` is a keyword in flink sql. We should surround them with backticks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org