Hi Robert,

Thank you for your enthusiastic answer


I have understood the current problem and look forward to a good solution and 
optimization by the community. I will continue to pay attention to changes in 
the community.


Best,
Jason


| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/17/2021 15:45,Robert Metzger<rmetz...@apache.org> wrote:
Hi Jason,


I hope you don't mind that I brought back the conversation to the user@ mailing 
list, so that others can benefit from the information as well.


Thanks a lot for sharing your use case. I personally believe that Flink should 
support invocations like "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob 
flink-stream-sql-ddl-1.0.0.jar ./config.json". There is no fundamental reason 
why this can not be supported.


The Javadoc about tableEnv.getConfig() mentions that the config is only about 
the "runtime behavior":
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L1151

... but I see how this is not clearly defined.


As a short-term fix, I've proposed to clarify in the configuration table which 
options are cluster vs job configurations: 
https://issues.apache.org/jira/browse/FLINK-22257.


But in the long term, we certainly need to improve the user experience.




On Wed, Jun 16, 2021 at 3:31 PM Jason Lee <jasonlee1...@163.com> wrote:

Dear Robert,


For tasks running on the cluster, some parameter configurations are global, but 
some parameter configurations need to be customized, such as some memory 
settings of TaskManager. For tasks with different state sizes, we need to 
configure different parameters. These parameters should not  be configured in 
flink-config.yaml. But for the current Flink, these parameters cannot be 
configured through StreamExecutionEnvironment, and some parameters are not 
effective if set through StreamTableEnvironment.


At the same time, Configuration is immutable after the task is started, which 
is correct, but I think some global parameters should also be specified in 
StreamExecutionEnvironment. At present, some parameters of checkpoint are also 
set globally, but they can be set through "StreamExecutionEnvironment 
.getCheckpointConfig().set()", then why can't the parameters of TaskManager's 
memory be set in this way? I think that setting the global parameters by "flink 
run -yD" is the same as setting by "StreamExecutionEnvironment". I am not sure 
if I understand it correctly.


I agree with you. I think we need to specify in the configuration of the 
official document that those parameters are best configured in 
flink-config.yaml. Those parameters can be modified in 
"StreamExecutionEnvironment", and those can only be passed through others 
Modified in the way. I think the document will be clearer for users.


Best,
Jason
| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/16/2021 21:04,Jason Lee<jasonlee1...@163.com> wrote:
Dear Robert,


Thanks for your answer


Our Flink SQL task is deployed through Per job.


We provide our users with a platform for developing Flink SQL tasks. We will 
write the user's SQL code and configuration parameters into a Config.json file. 
At the same time, we develop a Flink Jar task at the bottom to actually execute 
the user's SQL through the command line. To perform this task, for example, the 
following is our instruction to start a Flink SQL task: "flink run -m 
yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar 
./config.json". In order to facilitate the user's personalized configuration 
parameters, we want to set user configuration parameters in the execution 
environment of the FlinkStreamSQLDDLJob class that we have implemented, such as 
the "taskmanager.memory.managed.fraction" parameter, but it is currently 
impossible to configure through the Flink execution environment These 
parameters, because they are not effective, can only be configured by flink run 
-yD.


I think the configuration in the official document states that those parameters 
cannot be set through 
"StreamTableEnvironment.getConfig.getConfiguration().set()", but can only be 
set through flink run -yD or configured in flink-conf.yaml. If the current 
document does not explain it, it will not take effect if you use the 
"StreamTableEnvironment.getConfig.getConfiguration().set()" method to set some 
parameters. In order to increase the use of personalized configuration 
parameters for users, I think these instructions can appear in the 
Configuration of the official document.


Best,
Jason


| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/16/2021 18:37,Robert Metzger<rmetz...@apache.org> wrote:
Hi Jason,


How are you deploying your Flink SQL tasks? (are you using per-job/application 
clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By default, 
I would recommend assuming that all configuration parameters are cluster 
settings, which require a cluster restart. Very few options (mostly those 
listed in the "Execution" section) are job settings, which can be set for each 
job.


Would it help if the table of configuration options in the documentation would 
tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration 
object, if the configuration is effectively immutable. I believe the option to 
set configuration on the (Stream)(Table)Environment is mostly there for local 
execution of Flink.




2. I agree, the docs are incomplete here (probably another symptom of the fact 
that the whole configuration management in Flink is not optimal). I see what I 
can do to improve the situation.


3. Except for local execution (everything runs in one JVM), I don't think we'll 
add support for this anytime soon. Some of the cluster configuration parameters 
just have to be global (like memory management), as they apply to all jobs 
executed on a cluster.




This ticket could be related to your problems: 
https://issues.apache.org/jira/browse/FLINK-21065


Let us know how you are deploying your Flink jobs, this will shed some more 
light on the discussion!


Best,
Robert




On Wed, Jun 16, 2021 at 4:27 AM Jason Lee <jasonlee1...@163.com> wrote:

Hi  everyone,


When I was researching and using Flink recently, I found that the official 
documentation on how to configure parameters is confusing, and when I set the 
parameters in some ways, it does not take effect. mainly as follows:


we usually use a DDL Jar package to execute Flink SQL tasks, but we found that 
some parameters are set by 
StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These 
parameters cannot take effect. For example, taskmanager.memory.managed.fraction 
cannot take effect if the parameter is set in the above way (the Note in 
TableConfig in the source code is as follows: Because options are read at 
different point in time when performing operations, it is recommended to set 
configuration options early after instantiating a table environment. ). And 
StreamExecutionEnvironment.getConfiguration() is protected, which leads to some 
parameters that cannot be set through the api. I feel that this is not 
reasonable. Because sometimes, we want to configure different parameters for 
different tasks in the form of Configuration.setxxx(key, value) in the api, 
instead of just configuring parameters through flink run -yD or flink-conf.yaml.


In the Configuration module of the official document, the description and 
default value of each parameter are introduced. There is no relevant 
introduction about the parameter setting method in the official document 
Configuration module. I think this is not friendly enough for users, especially 
users who want to personalize some parameters. I feel that this method can be 
described in the official document.


In summary, for some normal tasks we can use the default parameter 
configuration, but for some tasks that require personalized configuration, 
especially Flink SQL tasks, I have a few suggestions on the use of 
configuration:


1. Regarding the api, I think that 
StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) 
configures parameters in this way. It should be separately explained, which 
parameters are not effective if configured in this way, otherwise, Some 
parameters configured in this way will not take effect, which will cause 
confusion for users.


2. In the official document, I think it is necessary to add instructions on how 
to configure these parameters. For example, it can be configured not only in 
flink-conf.yaml, but also in the running command through flink run -yD, or 
whether there are other The parameters can be configured in the mode.


3. Questions about StreamExecutionEnvironment.getConfiguration() being 
protected. Will the community develop in later versions? Is there any effective 
way for users to set some parameters in the api and make them effective, such 
as configuring the taskmanager.memory.managed.fraction parameter.


Regarding some of the above issues, and why the parameter setting will not take 
effec. Maybe I did not describe it clearly enough, or because I did not 
understand the problem clearly, I hope to get a reply and discuss from the 
community.


Best,
Jason


| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制

Reply via email to