[ https://issues.apache.org/jira/browse/FLINK-1525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14536566#comment-14536566 ]
ASF GitHub Bot commented on FLINK-1525: --------------------------------------- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/664 [FLINK-1525][FEEDBACK] Introduction of a small input parameter parsing utility Hi, last week I was running a bunch of Flink Streaming jobs on a cluster. One of the jobs had 8 arguments which I changed in different iterations of the program. I ended up passing arguments like ``` 16 1 8 3 10k hdp22-w-1.c.internal:6667,hdp22-w-0.c.internal:6667,hdp22-m.c.internal:6667 10000 ``` Its obvious that this is not easily maintainable. In addition to this experience, I got similar feedback from at least two other Flink users. Therefore, I sat down and implemented a simple class which allows users to work with input parameters in a hassle-free manner. The tool is called **ParameterUtil**. It can be initialized from: - regular command line arguments (`-` and `--`): `ParameterUtil.fromArgs(new String[]{"--berlin"});` - `.properties` files: `ParameterUtil.fromPropertiesFile(propertiesFile);` - system properties (-D arguments to the JVM): `ParameterUtil.fromSystemProperties()`; I'm also planning to provide an initializer which accepts the same arguments as Hadoop's GenericOptionsParser: https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html (our users are just too used to Hadoop's tooling) For accessing arguments, it has methods like: `parameter.getRequired("input")`, `parameter.get("output", "myDefaultValue")`, `parameter.getLong("expectedCount", -1L)` and so on ... Also, I added a method to export the parameters to Flink's `Configuration` class: ``` Configuration config = parameter.getConfiguration(); config.getLong("expectedCount", -1L) ``` This allows users to pass the input arguments to operators in the APIs: ``` text.flatMap(new Tokenizer()).withParameters(conf) ``` The `ParameterUtil` itself is Serializable, so it can be passed into user functions (for example to the `Tokenizer`). Also, I extended the `ExecutionConfig` to allow passing a `UserConfig` with custom stuff inside it. The `ParameterUtil` is implementing the `UserConfig` interface, so users can do the following: ```java public static void main(String[] args) throws Exception { ParameterUtil pt = ParameterUtil.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setUserConfig(pt); ///.... regular flink stuff .... } ``` Inside a (rich) user function, users can access the command line arguments: ```java text.flatMap(new Tokenizer()).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { ExecutionConfig.UserConfig uc = getRuntimeContext().getExecutionConfig().getUserConfig(); ParameterUtil pt = (ParameterUtil) uc; float norm = pt.getFloat("normalization", 0.15f); } }) ``` The `UserConfig` allows to export Key/Value pairs to the web interface. Running Wordcount: ``` /bin/flink run ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar --input /home/robert/incubator-flink/build-target/README.txt --output /tmp/wo ``` Will lead to the following result:  Before I'm now going to add this to all examples I would like to get some feedback for the API choices I made (I don't want to change all examples afterwards ;) ). Wordcount currently looks like this: ```java public static void main(String[] args) throws Exception { ParameterUtil pt = ParameterUtil.fromArgs(args); boolean fileOutput = pt.getNumberOfParameters() == 2; String textPath = null; String outputPath = null; if(fileOutput) { textPath = pt.getRequired("input"); outputPath = pt.getRequired("output"); } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setUserConfig(pt); // create initial DataSet, containing the text lines. DataSet<String> text; if(fileOutput) { text = env.readTextFile(textPath); } else { // get default test text data text = WordCountData.getDefaultTextLineDataSet(env); } DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink1525 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #664 ---- commit 95a55eb7c0acabebf9ac0decfb669f3da0b514b1 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-07T20:49:33Z Initial draft commit e801dcd5e6f43139f748d20468129126e409c4f4 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-07T20:51:27Z wip commit 7c92b11c0c02faa25c645a5c74c00e83c73b7492 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T09:58:29Z wip commit edf6aef4368d7589d87d515572334bbc9c4f7a99 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T14:29:58Z integrated into web frontend commit 8bdc8f092bfa6de2e26c36d0dfb976a3e9d59c89 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-08T16:33:34Z wip commit c339a5e98dd57fbb6753f9db10c15db3737e02c8 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T11:41:20Z travis, give me some feedback commit dd1f5029fa0efb792bdab94f54bca3c61a9d0f32 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T11:45:57Z starting to rework the examples commit 6f03b1ad054a6346996f3b148e09e0b3101588d7 Author: Robert Metzger <rmetz...@apache.org> Date: 2015-05-09T13:53:05Z wip ---- > Provide utils to pass -D parameters to UDFs > -------------------------------------------- > > Key: FLINK-1525 > URL: https://issues.apache.org/jira/browse/FLINK-1525 > Project: Flink > Issue Type: Improvement > Components: flink-contrib > Reporter: Robert Metzger > Labels: starter > > Hadoop users are used to setting job configuration through "-D" on the > command line. > Right now, Flink users have to manually parse command line arguments and pass > them to the methods. > It would be nice to provide a standard args parser with is taking care of > such stuff. -- This message was sent by Atlassian JIRA (v6.3.4#6332)