[ 
https://issues.apache.org/jira/browse/FLINK-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069967#comment-16069967
 ] 

ASF GitHub Bot commented on FLINK-6058:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3921
  
    I think we can get by without changing `ClusterClient` and 
`ContextEnvironment` by only reading the parallelism from the global config in 
`CliFrontend` when trying to read the user parallelism from the parameters, 
i.e. in `run()` and `info()`. In the example of `run()` we could change this 
code:
    ```
    int userParallelism = options.getParallelism();
    LOG.debug("User parallelism is set to {}", userParallelism);
    if (client.getMaxSlots() != -1 && userParallelism == -1) {
        logAndSysout("Using the parallelism provided by the remote cluster ("
                + client.getMaxSlots() + "). "
                + "To use another parallelism, set it at the ./bin/flink 
client.");
        userParallelism = client.getMaxSlots();
    }
    return executeProgram(program, client, userParallelism);
    ```
    to this
    ```
    int parallelism = options.getParallelism();
    LOG.debug("User parallelism is set to {}", parallelism);
    if (client.getMaxSlots() != -1 && parallelism == -1) {
        logAndSysout("Using the parallelism provided by the remote cluster ("
                + client.getMaxSlots() + "). "
                + "To use another parallelism, set it at the ./bin/flink 
client.");
        parallelism = client.getMaxSlots();
    } else if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
        parallelism = GlobalConfiguration.loadConfiguration().getInteger(
                ConfigConstants.DEFAULT_PARALLELISM_KEY,
                ConfigConstants.DEFAULT_PARALLELISM);
    }
    
    return executeProgram(program, client, parallelism);
    ```
    with this change `StreamContextEnvironment` would simply need this:
    ```
    if (ctx.getParallelism() > 0) {
        setParallelism(ctx.getParallelism());
    }
    ```
    because the environment will have the default parallelism set (in 
`ContextEnvironmentFactory` 
https://github.com/apache/flink/blob/c793ea41d88fe84fa97d825728ad95f35e27ef82/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java#L52-L52)
    
    What do you think?



> Don't read DEFAULT_PARALLELISM from GlobalConfiguration
> -------------------------------------------------------
>
>                 Key: FLINK-6058
>                 URL: https://issues.apache.org/jira/browse/FLINK-6058
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Fang Yong
>
> In the constructor of {{StreamContextEnvironment}} we read the 
> {{DEFAULT_PARALLELISM}} from the {{GlobalConfiguration}}. This assumes that 
> the environment variables are correctly set and can lead to problems. We 
> should read the default parallelism in the client and set it in the 
> {{ContextEnvironment}} that it creates. This can then be read by the 
> {{StreamContextEnvironment}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to