dianfu commented on code in PR #27247:
URL: https://github.com/apache/flink/pull/27247#discussion_r2544126991
##########
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##########
@@ -261,6 +261,28 @@ public class PythonOptions {
+ "Note that currently it still doesn't
support to execute Python user-defined functions in `thread` mode in all
places. "
+ "It will fall back to `process` mode in
these cases.");
+ public static final ConfigOption<String> PYTHON_DEFAULT_LOGGING_LEVEL =
+ ConfigOptions.key("python.default.logging.level")
+ .stringType()
+ .defaultValue("INFO")
+ .withDescription(
+ "Controls the default log level of python loggers
without a log level override.");
Review Comment:
What about update it as following:
```
Controls the default log level of python loggers, available values: OFF,
ERROR, WARN, INFO, DEBUG, TRACE.
```
##########
flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py:
##########
@@ -181,3 +187,46 @@ def _start_sdk_worker_main(self, start_worker_request:
beam_fn_api_pb2.StartWork
self._parse_param_lock.release()
if fn_log_handler:
fn_log_handler.close()
+
+ def _get_log_level_from_options_dict(self, config) -> int:
+ """Get log level from options dict's entry
`default_sdk_harness_log_level`.
+ If not specified, default log level is logging.INFO.
+ """
+ dict_level = config.get_string('python.default.logging.level', 'INFO')
+ log_level = dict_level
+ if log_level.isdecimal():
+ log_level = int(log_level)
+ else:
+ # labeled log level
+ log_level = getattr(logging, log_level, None)
+ if not isinstance(log_level, int):
+ # unknown log level.
+ _LOGGER.error("Unknown log level %s. Use default value INFO.",
dict_level)
+ log_level = logging.INFO
+
+ return log_level
+
+ def _set_log_level_overrides(self, config) -> None:
Review Comment:
could be declared as static
##########
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##########
@@ -261,6 +261,28 @@ public class PythonOptions {
+ "Note that currently it still doesn't
support to execute Python user-defined functions in `thread` mode in all
places. "
+ "It will fall back to `process` mode in
these cases.");
+ public static final ConfigOption<String> PYTHON_DEFAULT_LOGGING_LEVEL =
+ ConfigOptions.key("python.default.logging.level")
Review Comment:
What about rename it as `python.logging.level.default` to keep the prefix
the same as `python.logging.level.overrides`?
##########
flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py:
##########
@@ -181,3 +187,46 @@ def _start_sdk_worker_main(self, start_worker_request:
beam_fn_api_pb2.StartWork
self._parse_param_lock.release()
if fn_log_handler:
fn_log_handler.close()
+
+ def _get_log_level_from_options_dict(self, config) -> int:
Review Comment:
could be declared as static
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]