dianfu commented on code in PR #27247:
URL: https://github.com/apache/flink/pull/27247#discussion_r2544126991


##########
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##########
@@ -261,6 +261,28 @@ public class PythonOptions {
                                     + "Note that currently it still doesn't 
support to execute Python user-defined functions in `thread` mode in all 
places. "
                                     + "It will fall back to `process` mode in 
these cases.");
 
+    public static final ConfigOption<String> PYTHON_DEFAULT_LOGGING_LEVEL =
+            ConfigOptions.key("python.default.logging.level")
+                    .stringType()
+                    .defaultValue("INFO")
+                    .withDescription(
+                            "Controls the default log level of python loggers 
without a log level override.");

Review Comment:
   What about update it as following:
   ```
   Controls the default log level of python loggers, available values: OFF, 
ERROR, WARN, INFO, DEBUG, TRACE.
   ```



##########
flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py:
##########
@@ -181,3 +187,46 @@ def _start_sdk_worker_main(self, start_worker_request: 
beam_fn_api_pb2.StartWork
             self._parse_param_lock.release()
             if fn_log_handler:
                 fn_log_handler.close()
+
+    def _get_log_level_from_options_dict(self, config) -> int:
+        """Get log level from options dict's entry 
`default_sdk_harness_log_level`.
+        If not specified, default log level is logging.INFO.
+        """
+        dict_level = config.get_string('python.default.logging.level', 'INFO')
+        log_level = dict_level
+        if log_level.isdecimal():
+            log_level = int(log_level)
+        else:
+            # labeled log level
+            log_level = getattr(logging, log_level, None)
+            if not isinstance(log_level, int):
+                # unknown log level.
+                _LOGGER.error("Unknown log level %s. Use default value INFO.", 
dict_level)
+                log_level = logging.INFO
+
+        return log_level
+
+    def _set_log_level_overrides(self, config) -> None:

Review Comment:
   could be declared as static



##########
flink-python/src/main/java/org/apache/flink/python/PythonOptions.java:
##########
@@ -261,6 +261,28 @@ public class PythonOptions {
                                     + "Note that currently it still doesn't 
support to execute Python user-defined functions in `thread` mode in all 
places. "
                                     + "It will fall back to `process` mode in 
these cases.");
 
+    public static final ConfigOption<String> PYTHON_DEFAULT_LOGGING_LEVEL =
+            ConfigOptions.key("python.default.logging.level")

Review Comment:
   What about rename it as `python.logging.level.default` to keep the prefix 
the same as `python.logging.level.overrides`?



##########
flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py:
##########
@@ -181,3 +187,46 @@ def _start_sdk_worker_main(self, start_worker_request: 
beam_fn_api_pb2.StartWork
             self._parse_param_lock.release()
             if fn_log_handler:
                 fn_log_handler.close()
+
+    def _get_log_level_from_options_dict(self, config) -> int:

Review Comment:
   could be declared as static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to