sjwiesman commented on a change in pull request #12592: URL: https://github.com/apache/flink/pull/12592#discussion_r442479931
########## File path: docs/try-flink/table_api.md ########## @@ -31,482 +31,290 @@ The Table API in Flink is commonly used to ease the definition of data analytics ## What Will You Be Building? -In this tutorial, you will learn how to build a continuous ETL pipeline for tracking financial transactions by account over time. -You will start by building your report as a nightly batch job, and then migrate to a streaming pipeline. +In this tutorial, you will learn how to build a real-time dashboard to track financial transactions by account. +The pipeline will read data from Kafka and write the results to MySQL visualized via Grafana. ## Prerequisites -This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. -It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. +This walk-through assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you come from a different programming language. +It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. ## Help, I’m Stuck! If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). -In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) consistently ranks as one of the most active of any Apache project and a great way to get help quickly. ## How To Follow Along If you want to follow along, you will require a computer with: * Java 8 or 11 * Maven +* Docker -A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight bash %} -$ mvn archetype:generate \ - -DarchetypeGroupId=org.apache.flink \ - -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless site.is_stable %} - -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} - -DarchetypeVersion={{ site.version }} \ - -DgroupId=spend-report \ - -DartifactId=spend-report \ - -Dversion=0.1 \ - -Dpackage=spendreport \ - -DinteractiveMode=false -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight bash %} -$ mvn archetype:generate \ - -DarchetypeGroupId=org.apache.flink \ - -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless site.is_stable %} - -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} - -DarchetypeVersion={{ site.version }} \ - -DgroupId=spend-report \ - -DartifactId=spend-report \ - -Dversion=0.1 \ - -Dpackage=spendreport \ - -DinteractiveMode=false -{% endhighlight %} -</div> -</div> - -{% unless site.is_stable %} +{% if site.version contains "SNAPSHOT" %} <p style="border-radius: 5px; padding: 5px" class="bg-danger"> - <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the command line. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> - If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For example: -{% highlight bash %} -<settings> - <activeProfiles> - <activeProfile>apache</activeProfile> - </activeProfiles> - <profiles> - <profile> - <id>apache</id> - <repositories> - <repository> - <id>apache-snapshots</id> - <url>https://repository.apache.org/content/repositories/snapshots/</url> - </repository> - </repositories> - </profile> - </profiles> -</settings> -{% endhighlight %} + <b> + NOTE: The Apache Flink Docker images used for this playground are only available for + released versions of Apache Flink. + </b><br> + Since you are currently looking at the latest SNAPSHOT + version of the documentation, all version references below will not work. + Please switch the documentation to the latest released version via the release picker which you + find on the left side below the menu. </p> -{% endunless %} +{% endif %} -You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, -Maven will create a project with all the dependencies to complete this tutorial. -After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE. +The required configuration files are available in the [flink-playgrounds](https://github.com/apache/flink-playgrounds) repository. +Once downloaded, open the project `flink-playground/table-walkthrough` in your IDE and navigate to the file `SpendReport`. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> {% highlight java %} -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); - -tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); -tEnv.registerTableSink("spend_report", new SpendReportTableSink()); -tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); +EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); +TableEnvironment tEnv = TableEnvironment.create(settings); + +tEnv.executeSql("CREATE TABLE transactions (\n" + + " account_id BIGINT,\n" + + " amount BIGINT,\n" + + " transaction_time TIMESTAMP(3),\n" + + " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'transactions',\n" + + " 'properties.bootstrap.servers' = 'kafka:9092',\n" + + " 'format' = 'csv'\n" + + ")"); + +tEnv.executeSql("CREATE TABLE spend_report (\n" + + " account_id BIGINT,\n" + + " log_ts TIMESTAMP(3),\n" + + " amount BIGINT\n," + + " PRIMARY KEY (account_id, log_ts) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" + + " 'table-name' = 'spend_report',\n" + + " 'driver' = 'com.mysql.jdbc.Driver',\n" + + " 'username' = 'sql-demo',\n" + + " 'password' = 'demo-sql'\n" + + ")"); + +Table transactions = tEnv.from("transactions"); +report(transactions).executeInsert("spend_report"); -tEnv - .scan("transactions") - .insertInto("spend_report"); - -env.execute("Spend Report"); {% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val env = ExecutionEnvironment.getExecutionEnvironment -val tEnv = BatchTableEnvironment.create(env) - -tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) -tEnv.registerTableSink("spend_report", new SpendReportTableSink) - -val truncateDateToHour = new TruncateDateToHour - -tEnv - .scan("transactions") - .insertInto("spend_report") - -env.execute("Spend Report") -{% endhighlight %} -</div> -</div> ## Breaking Down The Code #### The Execution Environment -The first two lines set up your `ExecutionEnvironment`. -The execution environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources. -This walkthrough begins with the batch environment since you are building a periodic batch report. -It is then wrapped in a `BatchTableEnvironment` to have full access to the Table API. +The first two lines set up your `TableEnvironment`. +The table environment is how you can set properties for your Job, specify whether you are writing a batch or a streaming application, and create your sources. +This walkthrough creates a standard table environment that uses the streaming runtime. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> {% highlight java %} -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); +TableEnvironment tEnv = TableEnvironment.create(settings); {% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val env = ExecutionEnvironment.getExecutionEnvironment -val tEnv = BatchTableEnvironment.create(env) -{% endhighlight %} -</div> -</div> - #### Registering Tables -Next, tables are registered in the execution environment that you can use to connect to external systems for reading and writing both batch and streaming data. -A table source provides access to data stored in external systems; such as a database, a key-value store, a message queue, or a file system. +Next, tables are registered in the environment that you can use to connect to external systems for reading and writing both batch and streaming data. +A table source provides access to data stored in external systems, such as a database, a key-value store, a message queue, or a file system. A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, JSON, Avro, or Parquet. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); -tEnv.registerTableSink("spend_report", new SpendReportTableSink()); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -tEnv.registerTableSource("transactions", new BoundedTransactionTableSource) -tEnv.registerTableSink("spend_report", new SpendReportTableSink) +{% highlight sql %} +tEnv.executeSql("CREATE TABLE transactions (\n" + + " account_id BIGINT,\n" + + " amount BIGINT,\n" + + " transaction_time TIMESTAMP(3),\n" + + " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'transactions',\n" + + " 'properties.bootstrap.servers' = 'kafka:9092',\n" + + " 'format' = 'csv'\n" + + ")"); {% endhighlight %} -</div> -</div> Two tables are registered; a transaction input table, and a spend report output table. -The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ amounts (`amount`). -In this tutorial, the table is backed by data generated in memory to avoid any dependencies on external systems. -In practice, the `BoundedTransactionTableSource` may be backed by a filesystem, a database, or any other static source. -The spend report (`spend_report`) table logs each row with log level **INFO**, instead of writing to persistent storage, so you can easily see your results. - -#### Registering A UDF - -Along with the tables, a [user-defined function]({{ site.baseurl }}/dev/table/functions/udfs.html) is registered for working with timestamps. -This function takes a timestamp and rounds it down to the nearest hour. - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); +The transactions (`transactions`) table lets us read credit card transactions, which contain account ID's (`account_id`), timestamps (`transaction_time`), and US$ amounts (`amount`). +The table is a logical view over a Kafka topic called `transactions` containing CSV data. + +{% highlight sql %} +tEnv.executeSql("CREATE TABLE spend_report (\n" + + " account_id BIGINT,\n" + + " log_ts TIMESTAMP(3),\n" + + " amount BIGINT\n," + + " PRIMARY KEY (account_id, log_ts) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" + + " 'table-name' = 'spend_report',\n" + + " 'driver' = 'com.mysql.jdbc.Driver',\n" + + " 'username' = 'sql-demo',\n" + + " 'password' = 'demo-sql'\n" + + ")"); {% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val truncateDateToHour = new TruncateDateToHour -{% endhighlight %} -</div> -</div> +The second table, `spend_report`, stores the final results of the aggregation. +Its underlying storage is a table in a MySql database. #### The Query With the environment configured and tables registered, you are ready to build your first application. -From the `TableEnvironment` you can `scan` an input table to read its rows and then write those results into an output table using `insertInto`. +From the `TableEnvironment` you can read `from` an input table to read its rows and then write those results into an output table using `executeInsert`. +The `report` function is where you will implement your business logic. +It is currently unimplemented. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> {% highlight java %} -tEnv - .scan("transactions") - .insertInto("spend_report"); +Table transactions = tEnv.from("transactions"); +report(transactions).executeInsert("spend_report"); {% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -tEnv - .scan("transactions") - .insertInto("spend_report") -{% endhighlight %} -</div> -</div> - -Initially, the Job reads all transactions and logs them out with log level **INFO**. - -#### Execute +## Testing -Flink applications are built lazily and shipped to the cluster for execution only once fully formed. -You call `ExecutionEnvironment#execute` to begin the execution of your Job by giving it a name. +The project contains a secondary testing class `SpendReportTest` that validates the logic of the report. +It creates a table environment in batch mode. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> {% highlight java %} -env.execute("Spend Report"); +EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); +TableEnvironment tEnv = TableEnvironment.create(settings); {% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -env.execute("Spend Report") -{% endhighlight %} -</div> -</div> +One of Flink's unique properties is that it provides consistent semantics across batch and streaming. +This means you can develop and test applications in batch mode on static datasets, and deploy to production as streaming applications! ## Attempt One Now with the skeleton of a Job set-up, you are ready to add some business logic. The goal is to build a report that shows the total spend for each account across each hour of the day. +This means the timestamp column needs be be rounded down from millisecond to hour granularity. + Just like a SQL query, Flink can select the required fields and group by your keys. Review comment: yep. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org