[ 
https://issues.apache.org/jira/browse/FLINK-1525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14536566#comment-14536566
 ] 

ASF GitHub Bot commented on FLINK-1525:
---------------------------------------

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/664

    [FLINK-1525][FEEDBACK] Introduction of a small input parameter parsing 
utility

    Hi,
    last week I was running a bunch of Flink Streaming jobs on a cluster. One 
of the jobs had 8 arguments which I changed in different iterations of the 
program.
    I ended up passing arguments like
    ```
    16 1 8 3 10k 
hdp22-w-1.c.internal:6667,hdp22-w-0.c.internal:6667,hdp22-m.c.internal:6667 
10000
    ```
    Its obvious that this is not easily maintainable.
    In addition to this experience, I got similar feedback from at least two 
other Flink users.
    
    Therefore, I sat down and implemented a simple class which allows users to 
work with input parameters in a hassle-free manner.
    The tool is called **ParameterUtil**. It can be initialized from:
    - regular command line arguments (`-` and `--`): 
`ParameterUtil.fromArgs(new String[]{"--berlin"});`
    - `.properties` files: `ParameterUtil.fromPropertiesFile(propertiesFile);`
    - system properties (-D arguments to the JVM): 
`ParameterUtil.fromSystemProperties()`;
    
    I'm also planning to provide an initializer which accepts the same 
arguments as Hadoop's GenericOptionsParser: 
https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html
 (our users are just too used to Hadoop's tooling)
    
    For accessing arguments, it has methods like:
    `parameter.getRequired("input")`, `parameter.get("output", 
"myDefaultValue")`, `parameter.getLong("expectedCount", -1L)` and so on ...
    
    Also, I added a method to export the parameters to Flink's `Configuration` 
class:
    ```
    Configuration config = parameter.getConfiguration();
    config.getLong("expectedCount", -1L)
    ```
    This allows users to pass the input arguments to operators in the APIs:
    ```
    text.flatMap(new Tokenizer()).withParameters(conf)
    ```
    
    The `ParameterUtil` itself is Serializable, so it can be passed into user 
functions (for example to the `Tokenizer`).
    Also, I extended the `ExecutionConfig` to allow passing a `UserConfig` with 
custom stuff inside it.
    
    The `ParameterUtil` is implementing the `UserConfig` interface, so users 
can do the following:
    
    ```java
    public static void main(String[] args) throws Exception {
      ParameterUtil pt = ParameterUtil.fromArgs(args);
      final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().setUserConfig(pt);
      ///.... regular flink stuff ....
    }
    ```
    Inside a (rich) user function, users can access the command line arguments:
    ```java
    text.flatMap(new Tokenizer()).flatMap(new 
RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
                ExecutionConfig.UserConfig uc = 
getRuntimeContext().getExecutionConfig().getUserConfig();
                ParameterUtil pt = (ParameterUtil) uc;
                float norm = pt.getFloat("normalization", 0.15f);
        }
    })
    ```
    
    The `UserConfig` allows to export Key/Value pairs to the web interface. 
Running Wordcount:
    ```
    /bin/flink run ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar 
--input /home/robert/incubator-flink/build-target/README.txt --output /tmp/wo
    ```
    Will lead to the following result:
    
    
![paramutil](https://cloud.githubusercontent.com/assets/89049/7550566/14ea36c2-f667-11e4-9a81-ee6a017527b0.png)
    
    
    Before I'm now going to add this to all examples I would like to get some 
feedback for the API choices I made (I don't want to change all examples 
afterwards ;) ).
    Wordcount currently looks like this:
    ```java
    public static void main(String[] args) throws Exception {
        ParameterUtil pt = ParameterUtil.fromArgs(args);
        boolean fileOutput = pt.getNumberOfParameters() == 2;
        String textPath = null;
        String outputPath = null;
        if(fileOutput) {
                textPath = pt.getRequired("input");
                outputPath = pt.getRequired("output");
        }
        
        // set up the execution environment
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setUserConfig(pt);
        
        // create initial DataSet, containing the text lines.
        DataSet<String> text;
        if(fileOutput) {
                text = env.readTextFile(textPath);
        } else {
                // get default test text data
                text = WordCountData.getDefaultTextLineDataSet(env);
        }
        
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink1525

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/664.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #664
    
----
commit 95a55eb7c0acabebf9ac0decfb669f3da0b514b1
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-07T20:49:33Z

    Initial draft

commit e801dcd5e6f43139f748d20468129126e409c4f4
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-07T20:51:27Z

    wip

commit 7c92b11c0c02faa25c645a5c74c00e83c73b7492
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T09:58:29Z

    wip

commit edf6aef4368d7589d87d515572334bbc9c4f7a99
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T14:29:58Z

    integrated into web frontend

commit 8bdc8f092bfa6de2e26c36d0dfb976a3e9d59c89
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T16:33:34Z

    wip

commit c339a5e98dd57fbb6753f9db10c15db3737e02c8
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T11:41:20Z

    travis, give me some feedback

commit dd1f5029fa0efb792bdab94f54bca3c61a9d0f32
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T11:45:57Z

    starting to rework the examples

commit 6f03b1ad054a6346996f3b148e09e0b3101588d7
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T13:53:05Z

    wip

----


> Provide utils to pass -D parameters to UDFs 
> --------------------------------------------
>
>                 Key: FLINK-1525
>                 URL: https://issues.apache.org/jira/browse/FLINK-1525
>             Project: Flink
>          Issue Type: Improvement
>          Components: flink-contrib
>            Reporter: Robert Metzger
>              Labels: starter
>
> Hadoop users are used to setting job configuration through "-D" on the 
> command line.
> Right now, Flink users have to manually parse command line arguments and pass 
> them to the methods.
> It would be nice to provide a standard args parser with is taking care of 
> such stuff.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to