File path: docs/try-flink/
@@ -31,482 +31,290 @@ The Table API in Flink is commonly used to ease the 
definition of data analytics
 ## What Will You Be Building? 
-In this tutorial, you will learn how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
-You will start by building your report as a nightly batch job, and then 
migrate to a streaming pipeline.
+In this tutorial, you will learn how to build a real-time dashboard to track 
financial transactions by account.
+The pipeline will read data from Kafka and write the results to MySQL 
visualized via Grafana.
 ## Prerequisites
-This walkthrough assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you are coming from a different 
programming language.
-It also assumes that you are familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+This walk-through assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you come from a different 
programming language.
+It also assumes that you are familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses.
 ## Help, I’m Stuck! 
 If you get stuck, check out the [community support 
-In particular, Apache Flink's [user mailing 
list]( is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+In particular, Apache Flink's [user mailing 
list]( consistently ranks 
as one of the most active of any Apache project and a great way to get help 
 ## How To Follow Along
 If you want to follow along, you will require a computer with: 
 * Java 8 or 11
 * Maven 
+* Docker
-A provided Flink Maven Archetype will create a skeleton project with all the 
necessary dependencies quickly:
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless 
site.is_stable %}
 \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless 
site.is_stable %}
 \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-{% unless site.is_stable %}
+{% if site.version contains "SNAPSHOT" %}
 <p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the command line. For details about 
this change, please refer to <a 
 official document</a>
-    If you wish to use the snapshot repository, you need to add a repository 
entry to your settings.xml. For example:
-{% highlight bash %}
-  <activeProfiles>
-    <activeProfile>apache</activeProfile>
-  </activeProfiles>
-  <profiles>
-    <profile>
-      <id>apache</id>
-      <repositories>
-        <repository>
-          <id>apache-snapshots</id>
-        </repository>
-      </repositories>
-    </profile>
-  </profiles>
-{% endhighlight %}
+  <b>
+  NOTE: The Apache Flink Docker images used for this playground are only 
available for
+  released versions of Apache Flink.
+  </b><br>
+  Since you are currently looking at the latest SNAPSHOT
+  version of the documentation, all version references below will not work.
+  Please switch the documentation to the latest released version via the 
release picker which you
+  find on the left side below the menu.
-{% endunless %}
+{% endif %}
-You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
-Maven will create a project with all the dependencies to complete this 
-After importing the project into your editor, you will see a file with the 
following code which you can run directly inside your IDE.
+The required configuration files are available in the 
[flink-playgrounds]( repository.
+Once downloaded, open the project `flink-playground/table-walkthrough` in your 
IDE and navigate to the file `SpendReport`. 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+    "    account_id  BIGINT,\n" +
+    "    amount      BIGINT,\n" +
+    "    transaction_time TIMESTAMP(3),\n" +
+    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
+    ") WITH (\n" +
+    "    'connector' = 'kafka',\n" +
+    "    'topic'     = 'transactions',\n" +
+    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+    "    'format'    = 'csv'\n" +
+    ")");
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "   'connector'  = 'jdbc',\n" +
+    "   'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "   'table-name' = 'spend_report',\n" +
+    "   'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "   'username'   = 'sql-demo',\n" +
+    "   'password'   = 'demo-sql'\n" +
+    ")");
+Table transactions = tEnv.from("transactions");
-    .scan("transactions")
-    .insertInto("spend_report");
-env.execute("Spend Report");
 {% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
-val truncateDateToHour = new TruncateDateToHour
-    .scan("transactions")
-    .insertInto("spend_report")
-env.execute("Spend Report")
-{% endhighlight %}
 ## Breaking Down The Code
 #### The Execution Environment
-The first two lines set up your `ExecutionEnvironment`.
-The execution environment is how you can set properties for your Job, specify 
whether you are writing a batch or a streaming application, and create your 
-This walkthrough begins with the batch environment since you are building a 
periodic batch report.
-It is then wrapped in a `BatchTableEnvironment` to have full access to the 
Table API.
+The first two lines set up your `TableEnvironment`.
+The table environment is how you can set properties for your Job, specify 
whether you are writing a batch or a streaming application, and create your 
+This walkthrough creates a standard table environment that uses the streaming 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
 {% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-{% endhighlight %}
 #### Registering Tables
-Next, tables are registered in the execution environment that you can use to 
connect to external systems for reading and writing both batch and streaming 
-A table source provides access to data stored in external systems; such as a 
database, a key-value store, a message queue, or a file system.
+Next, tables are registered in the environment that you can use to connect to 
external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems, such as a 
database, a key-value store, a message queue, or a file system.
 A table sink emits a table to an external storage system.
 Depending on the type of source and sink, they support different formats such 
as CSV, JSON, Avro, or Parquet.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-{% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+     "    account_id  BIGINT,\n" +
+     "    amount      BIGINT,\n" +
+     "    transaction_time TIMESTAMP(3),\n" +
+     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
+     ") WITH (\n" +
+     "    'connector' = 'kafka',\n" +
+     "    'topic'     = 'transactions',\n" +
+     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+     "    'format'    = 'csv'\n" +
+     ")");
 {% endhighlight %}
 Two tables are registered; a transaction input table, and a spend report 
output table.
-The transactions (`transactions`) table lets us read credit card transactions, 
which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ 
amounts (`amount`).
-In this tutorial, the table is backed by data generated in memory to avoid any 
dependencies on external systems.
-In practice, the `BoundedTransactionTableSource` may be backed by a 
filesystem, a database, or any other static source.
-The spend report (`spend_report`) table logs each row with log level **INFO**, 
instead of writing to persistent storage, so you can easily see your results.
-#### Registering A UDF
-Along with the tables, a [user-defined function]({{ site.baseurl 
}}/dev/table/functions/udfs.html) is registered for working with timestamps.
-This function takes a timestamp and rounds it down to the nearest hour.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+The transactions (`transactions`) table lets us read credit card transactions, 
which contain account ID's (`account_id`), timestamps (`transaction_time`), and 
US$ amounts (`amount`).
+The table is a logical view over a Kafka topic called `transactions` 
containing CSV data.
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "    'connector'  = 'jdbc',\n" +
+    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "    'table-name' = 'spend_report',\n" +
+    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "    'username'   = 'sql-demo',\n" +
+    "    'password'   = 'demo-sql'\n" +
+    ")");
 {% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val truncateDateToHour = new TruncateDateToHour
-{% endhighlight %}
+The second table, `spend_report`, stores the final results of the aggregation.
+Its underlying storage is a table in a MySql database.
 #### The Query
 With the environment configured and tables registered, you are ready to build 
your first application.
-From the `TableEnvironment` you can `scan` an input table to read its rows and 
then write those results into an output table using `insertInto`.
+From the `TableEnvironment` you can read `from` an input table to read its 
rows and then write those results into an output table using `executeInsert`.
+The `report` function is where you will implement your business logic.
+It is currently unimplemented.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-    .scan("transactions")
-    .insertInto("spend_report");
+Table transactions = tEnv.from("transactions");
 {% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-    .scan("transactions")
-    .insertInto("spend_report")
-{% endhighlight %}
-Initially, the Job reads all transactions and logs them out with log level 
-#### Execute
+## Testing 
-Flink applications are built lazily and shipped to the cluster for execution 
only once fully formed.
-You call `ExecutionEnvironment#execute` to begin the execution of your Job by 
giving it a name.
+The project contains a secondary testing class `SpendReportTest` that 
validates the logic of the report.
+It creates a table environment in batch mode. 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-env.execute("Spend Report");
+EnvironmentSettings settings = 
+TableEnvironment tEnv = TableEnvironment.create(settings); 
 {% endhighlight %}
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.execute("Spend Report")
-{% endhighlight %}
+One of Flink's unique properties is that it provides consistent semantics 
across batch and streaming.
+This means you can develop and test applications in batch mode on static 
datasets, and deploy to production as streaming applications!
 ## Attempt One
 Now with the skeleton of a Job set-up, you are ready to add some business 
 The goal is to build a report that shows the total spend for each account 
across each hour of the day.
+This means the timestamp column needs be be rounded down from millisecond to 
hour granularity. 
 Just like a SQL query, Flink can select the required fields and group by your 

