GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/664
[FLINK-1525][FEEDBACK] Introduction of a small input parameter parsing utility Hi, last week I was running a bunch of Flink Streaming jobs on a cluster. One of the jobs had 8 arguments which I changed in different iterations of the program. I ended up passing arguments like ``` 16 1 8 3 10k hdp22-w-1.c.internal:6667,hdp22-w-0.c.internal:6667,hdp22-m.c.internal:6667 10000 ``` Its obvious that this is not easily maintainable. In addition to this experience, I got similar feedback from at least two other Flink users. Therefore, I sat down and implemented a simple class which allows users to work with input parameters in a hassle-free manner. The tool is called **ParameterUtil**. It can be initialized from: - regular command line arguments (`-` and `--`): `ParameterUtil.fromArgs(new String[]{"--berlin"});` - `.properties` files: `ParameterUtil.fromPropertiesFile(propertiesFile);` - system properties (-D arguments to the JVM): `ParameterUtil.fromSystemProperties()`; I'm also planning to provide an initializer which accepts the same arguments as Hadoop's GenericOptionsParser: https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html (our users are just too used to Hadoop's tooling) For accessing arguments, it has methods like: `parameter.getRequired("input")`, `parameter.get("output", "myDefaultValue")`, `parameter.getLong("expectedCount", -1L)` and so on ... Also, I added a method to export the parameters to Flink's `Configuration` class: ``` Configuration config = parameter.getConfiguration(); config.getLong("expectedCount", -1L) ``` This allows users to pass the input arguments to operators in the APIs: ``` text.flatMap(new Tokenizer()).withParameters(conf) ``` The `ParameterUtil` itself is Serializable, so it can be passed into user functions (for example to the `Tokenizer`). Also, I extended the `ExecutionConfig` to allow passing a `UserConfig` with custom stuff inside it. The `ParameterUtil` is implementing the `UserConfig` interface, so users can do the following: ```java public static void main(String[] args) throws Exception { ParameterUtil pt = ParameterUtil.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setUserConfig(pt); ///.... regular flink stuff .... } ``` Inside a (rich) user function, users can access the command line arguments: ```java text.flatMap(new Tokenizer()).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { ExecutionConfig.UserConfig uc = getRuntimeContext().getExecutionConfig().getUserConfig(); ParameterUtil pt = (ParameterUtil) uc; float norm = pt.getFloat("normalization", 0.15f); } }) ``` The `UserConfig` allows to export Key/Value pairs to the web interface. Running Wordcount: ``` /bin/flink run ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar --input /home/robert/incubator-flink/build-target/README.txt --output /tmp/wo ``` Will lead to the following result: ![paramutil](https://cloud.githubusercontent.com/assets/89049/7550566/14ea36c2-f667-11e4-9a81-ee6a017527b0.png) Before I'm now going to add this to all examples I would like to get some feedback for the API choices I made (I don't want to change all examples afterwards ;) ). Wordcount currently looks like this: ```java public static void main(String[] args) throws Exception { ParameterUtil pt = ParameterUtil.fromArgs(args); boolean fileOutput = pt.getNumberOfParameters() == 2; String textPath = null; String outputPath = null; if(fileOutput) { textPath = pt.getRequired("input"); outputPath = pt.getRequired("output"); } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setUserConfig(pt); // create initial DataSet, containing the text lines. DataSet<String> text; if(fileOutput) { text = env.readTextFile(textPath); } else { // get default test text data text = WordCountData.getDefaultTextLineDataSet(env); } DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink1525 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #664 ---- commit 95a55eb7c0acabebf9ac0decfb669f3da0b514b1 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-07T20:49:33Z Initial draft commit e801dcd5e6f43139f748d20468129126e409c4f4 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-07T20:51:27Z wip commit 7c92b11c0c02faa25c645a5c74c00e83c73b7492 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T09:58:29Z wip commit edf6aef4368d7589d87d515572334bbc9c4f7a99 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T14:29:58Z integrated into web frontend commit 8bdc8f092bfa6de2e26c36d0dfb976a3e9d59c89 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T16:33:34Z wip commit c339a5e98dd57fbb6753f9db10c15db3737e02c8 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T11:41:20Z travis, give me some feedback commit dd1f5029fa0efb792bdab94f54bca3c61a9d0f32 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T11:45:57Z starting to rework the examples commit 6f03b1ad054a6346996f3b148e09e0b3101588d7 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T13:53:05Z wip ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---