alpreu commented on a change in pull request #17760: URL: https://github.com/apache/flink/pull/17760#discussion_r747442080
########## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala ########## @@ -18,74 +18,170 @@ package org.apache.flink.streaming.scala.examples.wordcount -import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.serialization.SimpleStringEncoder +import org.apache.flink.configuration.MemorySize +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.connector.file.src.FileSource +import org.apache.flink.connector.file.src.reader.TextLineFormat +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.examples.wordcount.util.WordCountData +import org.apache.flink.streaming.scala.examples.wordcount.util.CLI +import org.apache.flink.util.Collector + +import java.time.Duration /** - * Implements the "WordCount" program that computes a simple word occurrence - * histogram over text files in a streaming fashion. + * Implements the "WordCount" program that computes a simple word occurrence histogram over text + * files. This Job can be executed in both streaming and batch execution modes. * * The input is a plain text file with lines separated by newline characters. * * Usage: - * {{{ - * WordCount --input <path> --output <path> - * }}} * - * If no parameters are provided, the program is run with default data from - * {@link WordCountData}. + * {{{ --input <path> }}} A list of input files and / or directories to read. + * If no inputs are provided, the program is run with default data from [[WordCountData]]. * - * This example shows how to: + * {{{--discovery-interval <duration> }}} Turns the file reader + * into a continuous source that will monitor the provided input directories + * every interval and read any new files. * - * - write a simple Flink Streaming program, - * - use tuple data types, - * - write and use transformation functions. + * {{{--output <path> }}}The output directory where the Job will + * write the results. If no output path is provided, the Job will print the results + * to `stdout` * + * This example shows how to: + * + * - Write a simple Flink DataStream program + * - Use tuple data types + * - Write and use a user-defined function */ object WordCount { - def main(args: Array[String]) { + // ************************************************************************* + // PROGRAM + // ************************************************************************* - // Checking input parameters - val params = ParameterTool.fromArgs(args) + def main(args: Array[String]): Unit = { + val params = CLI.fromArgs(args) - // set up the execution environment + // Create the execution environment. This is the main entrypoint + // to building a Flink application. val env = StreamExecutionEnvironment.getExecutionEnvironment - // make parameters available in the web interface + // Apache Flink’s unified approach to stream and batch processing means that a DataStream + // application executed over bounded input will produce the same final results regardless + // of the configured execution mode. It is important to note what final means here: a job + // executing in STREAMING mode might produce incremental updates (think upserts in + // a database) while a BATCH job would only produce one final result at the end. The final + // result will be the same if interpreted correctly, but getting there can be different. + // + // The “classic” execution behavior of the DataStream API is called STREAMING execution + // mode. Applications should use streaming execution for unbounded jobs that require + // continuous incremental processing and are expected to stay online indefinitely. + // + // By enabling BATCH execution, we allow Flink to apply additional optimizations that we + // can only do when we know that our input is bounded. For example, different + // join/aggregation strategies can be used, in addition to a different shuffle + // implementation that allows more efficient task scheduling and failure recovery behavior. + // + // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH if all sources Review comment: ```suggestion // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources ``` ########## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala ########## @@ -18,74 +18,170 @@ package org.apache.flink.streaming.scala.examples.wordcount -import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.serialization.SimpleStringEncoder +import org.apache.flink.configuration.MemorySize +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.connector.file.src.FileSource +import org.apache.flink.connector.file.src.reader.TextLineFormat +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.examples.wordcount.util.WordCountData +import org.apache.flink.streaming.scala.examples.wordcount.util.CLI +import org.apache.flink.util.Collector + +import java.time.Duration /** - * Implements the "WordCount" program that computes a simple word occurrence - * histogram over text files in a streaming fashion. + * Implements the "WordCount" program that computes a simple word occurrence histogram over text + * files. This Job can be executed in both streaming and batch execution modes. * * The input is a plain text file with lines separated by newline characters. * * Usage: - * {{{ - * WordCount --input <path> --output <path> - * }}} * - * If no parameters are provided, the program is run with default data from - * {@link WordCountData}. + * {{{ --input <path> }}} A list of input files and / or directories to read. + * If no inputs are provided, the program is run with default data from [[WordCountData]]. * - * This example shows how to: + * {{{--discovery-interval <duration> }}} Turns the file reader + * into a continuous source that will monitor the provided input directories + * every interval and read any new files. * - * - write a simple Flink Streaming program, - * - use tuple data types, - * - write and use transformation functions. + * {{{--output <path> }}}The output directory where the Job will + * write the results. If no output path is provided, the Job will print the results + * to `stdout` * + * This example shows how to: + * + * - Write a simple Flink DataStream program + * - Use tuple data types + * - Write and use a user-defined function */ object WordCount { - def main(args: Array[String]) { + // ************************************************************************* + // PROGRAM + // ************************************************************************* - // Checking input parameters - val params = ParameterTool.fromArgs(args) + def main(args: Array[String]): Unit = { + val params = CLI.fromArgs(args) - // set up the execution environment + // Create the execution environment. This is the main entrypoint + // to building a Flink application. val env = StreamExecutionEnvironment.getExecutionEnvironment - // make parameters available in the web interface + // Apache Flink’s unified approach to stream and batch processing means that a DataStream + // application executed over bounded input will produce the same final results regardless + // of the configured execution mode. It is important to note what final means here: a job + // executing in STREAMING mode might produce incremental updates (think upserts in + // a database) while a BATCH job would only produce one final result at the end. The final + // result will be the same if interpreted correctly, but getting there can be different. + // + // The “classic” execution behavior of the DataStream API is called STREAMING execution + // mode. Applications should use streaming execution for unbounded jobs that require + // continuous incremental processing and are expected to stay online indefinitely. + // + // By enabling BATCH execution, we allow Flink to apply additional optimizations that we + // can only do when we know that our input is bounded. For example, different + // join/aggregation strategies can be used, in addition to a different shuffle + // implementation that allows more efficient task scheduling and failure recovery behavior. + // + // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH if all sources + // are bounded and otherwise STREAMING. + // Apache Flink’s unified approach to stream and batch processing means that a DataStream + // application executed over bounded input will produce the same final results regardless + // of the configured execution mode. It is important to note what final means here: a job + // executing in STREAMING mode might produce incremental updates (think upserts in + // a database) while a BATCH job would only produce one final result at the end. The final + // result will be the same if interpreted correctly, but getting there can be different. + // + // The “classic” execution behavior of the DataStream API is called STREAMING execution + // mode. Applications should use streaming execution for unbounded jobs that require + // continuous incremental processing and are expected to stay online indefinitely. + // + // By enabling BATCH execution, we allow Flink to apply additional optimizations that we + // can only do when we know that our input is bounded. For example, different + // join/aggregation strategies can be used, in addition to a different shuffle + // implementation that allows more efficient task scheduling and failure recovery behavior. + // + // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH if all sources + // are bounded and otherwise STREAMING. Review comment: Duplicate comment ########## File path: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java ########## @@ -50,51 +69,92 @@ // ************************************************************************* public static void main(String[] args) throws Exception { + final CLI params = CLI.fromArgs(args); - // Checking input parameters - final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); - - // set up the execution environment + // Create the execution environment. This is the main entrypoint + // to building a Flink application. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // make parameters available in the web interface + // Apache Flink’s unified approach to stream and batch processing means that a DataStream + // application executed over bounded input will produce the same final results regardless + // of the configured execution mode. It is important to note what final means here: a job + // executing in STREAMING mode might produce incremental updates (think upserts in + // a database) while a BATCH job would only produce one final result at the end. The final + // result will be the same if interpreted correctly, but getting there can be different. + // + // The “classic” execution behavior of the DataStream API is called STREAMING execution + // mode. Applications should use streaming execution for unbounded jobs that require + // continuous incremental processing and are expected to stay online indefinitely. + // + // By enabling BATCH execution, we allow Flink to apply additional optimizations that we + // can only do when we know that our input is bounded. For example, different + // join/aggregation strategies can be used, in addition to a different shuffle + // implementation that allows more efficient task scheduling and failure recovery behavior. + // + // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH if all sources Review comment: ```suggestion // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org