alibahadirzeybek commented on a change in pull request #17760:
URL: https://github.com/apache/flink/pull/17760#discussion_r748401826



##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the 
results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or 
AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means 
that a DataStream
+    // application executed over bounded input will produce the same final 
results regardless
+    // of the configured execution mode. It is important to note what final 
means here: a job
+    // executing in STREAMING mode might produce incremental updates (think 
upserts in
+    // a database) while a BATCH job would only produce one final result at 
the end. The final
+    // result will be the same if interpreted correctly, but getting there can 
be different.
+    //
+    // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs 
that require
+    // continuous incremental processing and are expected to stay online 
indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+    // can only do when we know that our input is bounded. For example, 
different
+    // join/aggregation strategies can be used, in addition to a different 
shuffle
+    // implementation that allows more efficient task scheduling and failure 
recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if 
all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
     // get input data
-    val text =
-    // read the text file from given input path
-    if (params.has("input")) {
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WordCount example with default inputs data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of 
directories.
+        // Each file will be processed as plain text and split based on 
newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, 
input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), 
"file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
     }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuples) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      // group by the tuple field "0" and sum up tuple field "1"
-      .keyBy(_._1)
-      .sum(1)
-
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output 
path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each words as a (2-tuple) containing (word, 1)

Review comment:
       ```suggestion
         // will output each word as a (2-tuple) containing (word, 1)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to