sjwiesman commented on a change in pull request #12592:
URL: https://github.com/apache/flink/pull/12592#discussion_r442179046



##########
File path: docs/try-flink/table_api.md
##########
@@ -31,482 +31,290 @@ The Table API in Flink is commonly used to ease the 
definition of data analytics
 
 ## What Will You Be Building? 
 
-In this tutorial, you will learn how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
-You will start by building your report as a nightly batch job, and then 
migrate to a streaming pipeline.
+In this tutorial, you will learn how to build a real-time dashboard to track 
financial transactions by account.
+The pipeline will read data from Kafka and write the results to MySQL 
visualized via Grafana.
 
 ## Prerequisites
 
-This walkthrough assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you are coming from a different 
programming language.
-It also assumes that you are familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+This walk-through assumes that you have some familiarity with Java or Scala, 
but you should be able to follow along even if you come from a different 
programming language.
+It also assumes that you are familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses.
 
 ## Help, I’m Stuck! 
 
 If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
-In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) consistently ranks 
as one of the most active of any Apache project and a great way to get help 
quickly. 
 
 ## How To Follow Along
 
 If you want to follow along, you will require a computer with: 
 
 * Java 8 or 11
 * Maven 
+* Docker
 
-A provided Flink Maven Archetype will create a skeleton project with all the 
necessary dependencies quickly:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-java \{% unless 
site.is_stable %}
-    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-table-scala \{% unless 
site.is_stable %}
-    
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
-    -DarchetypeVersion={{ site.version }} \
-    -DgroupId=spend-report \
-    -DartifactId=spend-report \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-{% endhighlight %}
-</div>
-</div>
-
-{% unless site.is_stable %}
+{% if site.version contains "SNAPSHOT" %}
 <p style="border-radius: 5px; padding: 5px" class="bg-danger">
-    <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the command line. For details about 
this change, please refer to <a 
href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html";>Maven
 official document</a>
-    If you wish to use the snapshot repository, you need to add a repository 
entry to your settings.xml. For example:
-{% highlight bash %}
-<settings>
-  <activeProfiles>
-    <activeProfile>apache</activeProfile>
-  </activeProfiles>
-  <profiles>
-    <profile>
-      <id>apache</id>
-      <repositories>
-        <repository>
-          <id>apache-snapshots</id>
-          
<url>https://repository.apache.org/content/repositories/snapshots/</url>
-        </repository>
-      </repositories>
-    </profile>
-  </profiles>
-</settings>
-{% endhighlight %}
+  <b>
+  NOTE: The Apache Flink Docker images used for this playground are only 
available for
+  released versions of Apache Flink.
+  </b><br>
+  Since you are currently looking at the latest SNAPSHOT
+  version of the documentation, all version references below will not work.
+  Please switch the documentation to the latest released version via the 
release picker which you
+  find on the left side below the menu.
 </p>
-{% endunless %}
+{% endif %}
 
-You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
-Maven will create a project with all the dependencies to complete this 
tutorial.
-After importing the project into your editor, you will see a file with the 
following code which you can run directly inside your IDE.
+The required configuration files are available in the 
[flink-playgrounds](https://github.com/apache/flink-playgrounds) repository.
+Once downloaded, open the project `flink-playground/table-walkthrough` in your 
IDE and navigate to the file `SpendReport`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+    "    account_id  BIGINT,\n" +
+    "    amount      BIGINT,\n" +
+    "    transaction_time TIMESTAMP(3),\n" +
+    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
SECOND\n" +
+    ") WITH (\n" +
+    "    'connector' = 'kafka',\n" +
+    "    'topic'     = 'transactions',\n" +
+    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+    "    'format'    = 'csv'\n" +
+    ")");
+
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "   'connector'  = 'jdbc',\n" +
+    "   'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "   'table-name' = 'spend_report',\n" +
+    "   'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "   'username'   = 'sql-demo',\n" +
+    "   'password'   = 'demo-sql'\n" +
+    ")");
+
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
-
-env.execute("Spend Report");
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
-
-val truncateDateToHour = new TruncateDateToHour
-
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
-
-env.execute("Spend Report")
-{% endhighlight %}
-</div>
-</div>
 
 ## Breaking Down The Code
 
 #### The Execution Environment
 
-The first two lines set up your `ExecutionEnvironment`.
-The execution environment is how you can set properties for your Job, specify 
whether you are writing a batch or a streaming application, and create your 
sources.
-This walkthrough begins with the batch environment since you are building a 
periodic batch report.
-It is then wrapped in a `BatchTableEnvironment` to have full access to the 
Table API.
+The first two lines set up your `TableEnvironment`.
+The table environment is how you can set properties for your Job, specify 
whether you are writing a batch or a streaming application, and create your 
sources.
+This walkthrough creates a standard table environment that uses the streaming 
runtime.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env  = ExecutionEnvironment.getExecutionEnvironment
-val tEnv = BatchTableEnvironment.create(env)
-{% endhighlight %}
-</div>
-</div>
-
 
 #### Registering Tables
 
-Next, tables are registered in the execution environment that you can use to 
connect to external systems for reading and writing both batch and streaming 
data.
-A table source provides access to data stored in external systems; such as a 
database, a key-value store, a message queue, or a file system.
+Next, tables are registered in the environment that you can use to connect to 
external systems for reading and writing both batch and streaming data.
+A table source provides access to data stored in external systems, such as a 
database, a key-value store, a message queue, or a file system.
 A table sink emits a table to an external storage system.
 Depending on the type of source and sink, they support different formats such 
as CSV, JSON, Avro, or Parquet.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource());
-tEnv.registerTableSink("spend_report", new SpendReportTableSink());
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new BoundedTransactionTableSource)
-tEnv.registerTableSink("spend_report", new SpendReportTableSink)
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE transactions (\n" +
+     "    account_id  BIGINT,\n" +
+     "    amount      BIGINT,\n" +
+     "    transaction_time TIMESTAMP(3),\n" +
+     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
SECOND\n" +
+     ") WITH (\n" +
+     "    'connector' = 'kafka',\n" +
+     "    'topic'     = 'transactions',\n" +
+     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
+     "    'format'    = 'csv'\n" +
+     ")");
 {% endhighlight %}
-</div>
-</div>
 
 Two tables are registered; a transaction input table, and a spend report 
output table.
-The transactions (`transactions`) table lets us read credit card transactions, 
which contain account ID's (`accountId`), timestamps (`timestamp`), and US$ 
amounts (`amount`).
-In this tutorial, the table is backed by data generated in memory to avoid any 
dependencies on external systems.
-In practice, the `BoundedTransactionTableSource` may be backed by a 
filesystem, a database, or any other static source.
-The spend report (`spend_report`) table logs each row with log level **INFO**, 
instead of writing to persistent storage, so you can easily see your results.
-
-#### Registering A UDF
-
-Along with the tables, a [user-defined function]({{ site.baseurl 
}}/dev/table/functions/udfs.html) is registered for working with timestamps.
-This function takes a timestamp and rounds it down to the nearest hour.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+The transactions (`transactions`) table lets us read credit card transactions, 
which contain account ID's (`account_id`), timestamps (`transaction_time`), and 
US$ amounts (`amount`).
+The table is a logical view over a Kafka topic called `transactions` 
containing CSV data.
+
+{% highlight sql %}
+tEnv.executeSql("CREATE TABLE spend_report (\n" +
+    "    account_id BIGINT,\n" +
+    "    log_ts     TIMESTAMP(3),\n" +
+    "    amount     BIGINT\n," +
+    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
+    ") WITH (\n" +
+    "    'connector'  = 'jdbc',\n" +
+    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
+    "    'table-name' = 'spend_report',\n" +
+    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
+    "    'username'   = 'sql-demo',\n" +
+    "    'password'   = 'demo-sql'\n" +
+    ")");
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val truncateDateToHour = new TruncateDateToHour
-{% endhighlight %}
-</div>
-</div>
+The second table, `spend_report`, stores the final results of the aggregation.
+Its underlying storage is a table in a MySql database.
 
 #### The Query
 
 With the environment configured and tables registered, you are ready to build 
your first application.
-From the `TableEnvironment` you can `scan` an input table to read its rows and 
then write those results into an output table using `insertInto`.
+From the `TableEnvironment` you can read `from` an input table to read its 
rows and then write those results into an output table using `executeInsert`.
+The `report` function is where you will implement your business logic.
+It is currently unimplemented.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report");
+Table transactions = tEnv.from("transactions");
+report(transactions).executeInsert("spend_report");
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .insertInto("spend_report")
-{% endhighlight %}
-</div>
-</div>
-
-Initially, the Job reads all transactions and logs them out with log level 
**INFO**.
-
-#### Execute
+## Testing 
 
-Flink applications are built lazily and shipped to the cluster for execution 
only once fully formed.
-You call `ExecutionEnvironment#execute` to begin the execution of your Job by 
giving it a name.
+The project contains a secondary testing class `SpendReportTest` that 
validates the logic of the report.
+It creates a table environment in batch mode. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-env.execute("Spend Report");
+EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings); 
 {% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.execute("Spend Report")
-{% endhighlight %}
-</div>
-</div>
+One of Flink's unique properties is that it provides consistent semantics 
across batch and streaming.
+This means you can develop and test applications in batch mode on static 
datasets, and deploy to production as streaming applications!
 
 ## Attempt One
 
 Now with the skeleton of a Job set-up, you are ready to add some business 
logic.
 The goal is to build a report that shows the total spend for each account 
across each hour of the day.
+This means the timestamp column needs be be rounded down from millisecond to 
hour granularity. 
+
 Just like a SQL query, Flink can select the required fields and group by your 
keys.
-Because the timestamp field has millisecond granularity, you can use the UDF 
to round it down to the nearest hour.
-Finally, select all the fields, summing the total spend per account-hour pair 
with the built-in `sum` [aggregate function]({{ site.baseurl 
}}/dev/table/functions/systemFunctions.html#aggregate-functions).
+These features, allong with [built-in functions]({% link 
dev/table/functions/systemFunctions.md %}) like `floor` and `sum`, you can 
write this report.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .select("accountId, timestamp.truncateDateToHour as timestamp, amount")
-    .groupBy("accountId, timestamp")
-    .select("accountId, timestamp, amount.sum as total")
-    .insertInto("spend_report");
+public static Table report(Table rows) {
+    return rows.select(
+            $("account_id"),
+            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .select('accountId, truncateDateToHour('timestamp) as 'timestamp, 'amount)
-    .groupBy('accountId, 'timestamp)
-    .select('accountId, 'timestamp, 'amount.sum as 'total)
-    .insertInto("spend_report")
+
+## User Defined Functions
+
+Flink contains a limited number of built-in functions, and sometimes you need 
to extend it with a [user-defined function]({% link dev/table/functions/udfs.md 
%}).
+If `floor` wasn't predefined, you could implement it yourself. 
+
+{% highlight java %}
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class MyFloor extends ScalarFunction {
+
+    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
+        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {
+
+        return timestamp.truncatedTo(ChronoUnit.HOURS);
+    }
+}
 {% endhighlight %}
-</div>
-</div>
 
-This query consumes all records from the `transactions` table, calculates the 
report, and outputs the results in an efficient, scalable manner.
+And then quickly integrate it in your application.
 
-{% highlight raw %}
-# Query 1 output showing account id, timestamp, and amount
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
+{% highlight java %}
+public static Table report(Table rows) {
+    return rows.select(
+            $("account_id"),
+            call(MyFloor.class, $("transaction_time")).as("log_ts"),
+            $("amount"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
 
+This query consumes all records from the `transactions` table, calculates the 
report, and outputs the results in an efficient, scalable manner.
+Running the test with this implementation will pass. 
+
 ## Adding Windows
 
 Grouping data based on time is a typical operation in data processing, 
especially when working with infinite streams.
-A grouping based on time is called a [window]({{ site.baseurl 
}}/dev/stream/operators/windows.html) and Flink offers flexible windowing 
semantics.
+A grouping based on time is called a [window]({% link 
dev/stream/operators/windows.md %}) and Flink offers flexible windowing 
semantics.
 The most basic type of window is called a `Tumble` window, which has a fixed 
size and whose buckets do not overlap.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-tEnv
-    .scan("transactions")
-    .window(Tumble.over("1.hour").on("timestamp").as("w"))
-    .groupBy("accountId, w")
-    .select("accountId, w.start as timestamp, amount.sum")
-    .insertInto("spend_report");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv
-    .scan("transactions")
-    .window(Tumble over 1.hour on 'timestamp as 'w)
-    .groupBy('accountId, 'w)
-    .select('accountId, 'w.start as 'timestamp, 'amount.sum)
-    .insertInto("spend_report")
+public static Table report(Table rows) {
+    return 
rows.window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
+        .groupBy($("account_id"), $("log_ts"))
+        .select(
+            $("account_id"),
+            $("log_ts").start().as("log_ts"),
+            $("amount").sum().as("amount"));
+}
 {% endhighlight %}
-</div>
-</div>
 
 This defines your application as using one hour tumbling windows based on the 
timestamp column.
 So a row with timestamp `2019-06-01 01:23:47` is put in the `2019-06-01 
01:00:00` window.
 
+
 Aggregations based on time are unique because time, as opposed to other 
attributes, generally moves forward in a continuous streaming application.
+Unlike `floor` and your UDF, window functions are 
[intrinsics](https://en.wikipedia.org/wiki/Intrinsic_function), which allows 
the runtime to apply additional optimizations.
 In a batch context, windows offer a convenient API for grouping records by a 
timestamp attribute.
 
-Running the updated query will produce identical results as before.
-
-{% highlight raw %}
-# Query 2 output showing account id, timestamp, and amount
-
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
-> 1, 2019-01-01 02:00:00.0, $859.35
-> 2, 2019-01-01 02:00:00.0, $458.40
-> 1, 2019-01-01 03:00:00.0, $330.85
-> 2, 2019-01-01 03:00:00.0, $730.02
-> 1, 2019-01-01 04:00:00.0, $585.16
-> 2, 2019-01-01 04:00:00.0, $760.76
-{% endhighlight %}
+Running the test with this implementation will also pass. 
 
 ## Once More, With Streaming!
 
-Because Flink's Table API offers consistent syntax and semantics for both 
batch and streaming, migrating from one to the other requires just two steps.
-
-The first step is to replace the batch `ExecutionEnvironment` with its 
streaming counterpart, `StreamExecutionEnvironment`, which creates a continuous 
streaming Job.
-It includes stream-specific configurations, such as the time characteristic, 
which when set to [event time]({{ site.baseurl }}/dev/event_time.html) 
guarantees consistent results even when faced with out-of-order events or a Job 
failure.
-This is what will be used by your `Tumble` window when grouping records.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val tEnv = StreamTableEnvironment.create(env)
-{% endhighlight %}
-</div>
-</div>
-
-The second step is to migrate from a bounded data source to an infinite data 
source.
-The project comes with an `UnboundedTransactionTableSource` that continuously 
creates transaction events in real-time.
-Similar to the `BoundedTransactionTableSource` this table is backed by data 
generated in memory to avoid any dependencies on external systems.
-In practice, this table might read from a streaming source such as Apache 
Kafka, AWS Kinesis, or Pravega.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-tEnv.registerTableSource("transactions", new 
UnboundedTransactionTableSource());
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-tEnv.registerTableSource("transactions", new UnboundedTransactionTableSource)
-{% endhighlight %}
-</div>
-</div>
-
 And that's it, a fully functional, stateful, distributed streaming application!
-The query continuously consumes the stream of transactions, computes the 
hourly spendings, and emits results as soon as they are ready.
+The query continuously consumes the stream of transactions from Kafka, 
computes the hourly spendings, and emits results as soon as they are ready.
 Since the input is unbounded, the query keeps running until it is manually 
stopped.
 And because the Job uses time window-based aggregations, Flink can perform 
specific optimizations such as state clean up when the framework knows that no 
more records will arrive for a particular window.
 
-{% highlight raw %}
-# Query 3 output showing account id, timestamp, and amount
-
-# These rows are calculated continuously over the hour 
-# and output immediately at the end of the hour
-> 1, 2019-01-01 00:00:00.0, $567.87
-> 2, 2019-01-01 00:00:00.0, $726.23
+The table playground is fully dockerized and runnable locally as streaming 
application.
+The environment contains a Kafka topic, a continuous data generator, MySql, 
and Grafana. 
 
-# Flink begins computing these rows as soon as 
-# as the first record for the window arrives
-> 1, 2019-01-01 01:00:00.0, $686.87
-> 2, 2019-01-01 01:00:00.0, $810.06
+From within the `table-walkthrough` folder start the docker-compose script.
 
+{% highlight bash %}
+$ docker-compose build
+$ docker-compose up -d
 {% endhighlight %}
 
-## Final Application
+You can see information on the running job via the [Flink 
console](http://localhost:8082/).
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-package spendreport;
-
-import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
-import org.apache.flink.walkthrough.common.table.TransactionTableSource;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Tumble;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-public class SpendReport {
+![Flink Console]({% link /fig/spend-report-console.png %}){:height="400px" 
width="800px"}
 
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+Explore the results from inside MySQL.
 
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-        tEnv.registerTableSource("transactions", new 
UnboundedTransactionTableSource());
-        tEnv.registerTableSink("spend_report", new SpendReportTableSink());
+{% highlight bash %}
+$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql
 
-        tEnv
-            .scan("transactions")
-            .window(Tumble.over("1.hour").on("timestamp").as("w"))
-            .groupBy("accountId, w")
-            .select("accountId, w.start as timestamp, amount.sum")
-            .insertInto("spend_report");
+mysql> use sql-demo;
+Database changed
 
-        env.execute("Spend Report");
-    }
-}
+mysql> select count(*) from spend_report;
++----------+
+| count(*) |
++----------+
+|      110 |
++----------+
 {% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-package spendreport
-
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;
-import org.apache.flink.walkthrough.common.table._
-
-object SpendReport {
 
-    def main(args: Array[String]): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+Finally, go to 
[Grafana](http://localhost:3000/d/FOe0PbmGk/walkthrough?viewPanel=2&orgId=1&refresh=5s)
 to see the fully visualized result!

Review comment:
       Yes, I ran docker compose several times it is stable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to