dianfu commented on a change in pull request #16732: URL: https://github.com/apache/flink/pull/16732#discussion_r686742153
########## File path: docs/content.zh/docs/dev/python/debugging.md ########## @@ -51,6 +51,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## 调试Python UDFs + +### 本地调试 + +你可以直接在 PyCharm 调试你的 Python 函数。 Review comment: ```suggestion 你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。 ``` ########## File path: docs/content/docs/dev/python/debugging.md ########## @@ -52,6 +52,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## Debugging Python UDFs + +### Local Debug + +You can debug your python functions directly in PyCharm. Review comment: ```suggestion You can debug your python functions directly in IDEs such as PyCharm. ``` ########## File path: docs/content.zh/docs/dev/python/debugging.md ########## @@ -51,6 +51,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## 调试Python UDFs + +### 本地调试 + +你可以直接在 PyCharm 调试你的 Python 函数。 + +**注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,你只能够使用[远程调试](#远程调试)的方式。 Review comment: ```suggestion **注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,只能够使用[远程调试](#远程调试)的方式。 ``` ########## File path: flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py ########## @@ -0,0 +1,159 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import atexit +import functools +import logging +import os +import sys +import threading +import traceback + +import grpc +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import ProfilingOptions +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.portability.api import endpoints_pb2 +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.runners.worker import sdk_worker_main +from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler +from apache_beam.runners.worker.sdk_worker import SdkHarness +from apache_beam.utils import thread_pool_executor, profiler +from google.protobuf import json_format + +from pyflink.fn_execution.beam import beam_sdk_worker_main # noqa # pylint: disable=unused-import + +_LOGGER = logging.getLogger(__name__) + + +class BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + """ + Worker pool entry point. + + The worker pool exposes an RPC service that is used in MiniCluster to start and stop the Python + SDK workers. + + The worker pool uses child thread for parallelism + """ + + def __init__(self): + self._worker_server = None + self._parse_param_lock = threading.Lock() + + def start(self): + worker_server = grpc.server( + thread_pool_executor.shared_unbounded_instance()) + worker_address = 'localhost:%s' % worker_server.add_insecure_port('[::]:0') + beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(self, worker_server) + worker_server.start() + _LOGGER.info('Listening for workers at %s', worker_address) + + self._worker_server = worker_server + atexit.register(functools.partial(worker_server.stop, 1)) + return worker_address + + def StartWorker(self, + start_worker_request: beam_fn_api_pb2.StartWorkerRequest, + unused_context): + try: + worker_thread = threading.Thread( + name='run_worker_%s' % start_worker_request.worker_id, + target=functools.partial(self._start_sdk_worker_main, start_worker_request)) + worker_thread.daemon = True + worker_thread.start() + return beam_fn_api_pb2.StartWorkerResponse() + except Exception: + return beam_fn_api_pb2.StartWorkerResponse(error=traceback.format_exc()) + + def StopWorker(self, + stop_worker_request: beam_fn_api_pb2.StopWorkerRequest, + unused_context): + pass + + def _start_sdk_worker_main(self, start_worker_request: beam_fn_api_pb2.StartWorkerRequest): + params = start_worker_request.params + self._parse_param_lock.acquire() + if 'PYTHONPATH' in params: + python_path_list = params['PYTHONPATH'].split(':') + python_path_list.reverse() + for path in python_path_list: + sys.path.insert(0, path) + if '_PYTHON_WORKING_DIR' in params: + os.chdir(params['_PYTHON_WORKING_DIR']) + os.environ.update(params) + self._parse_param_lock.release() + + # read job information from provision stub + metadata = [("worker_id", start_worker_request.worker_id)] + provision_endpoint = start_worker_request.provision_endpoint.url + with grpc.insecure_channel(provision_endpoint) as channel: + client = ProvisionServiceStub(channel=channel) + info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info + options = json_format.MessageToJson(info.pipeline_options) + logging_endpoint = info.logging_endpoint.url + control_endpoint = info.control_endpoint.url + + try: + logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor(url=logging_endpoint) + + # Send all logs to the runner. + fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor) + logging.getLogger().setLevel(logging.ERROR) + logging.getLogger().addHandler(fn_log_handler) + except Exception: + _LOGGER.error( + "Failed to set up logging handler, continuing without.", + exc_info=True) + fn_log_handler = None + + sdk_pipeline_options = sdk_worker_main._parse_pipeline_options(options) + + semi_persistent_directory = start_worker_request.params['SEMI_PERSISTENT_DIRECTORY'] + + _worker_id = start_worker_request.worker_id + + try: + sdk_worker_main._load_main_session(semi_persistent_directory) Review comment: Should we remove this? ########## File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java ########## @@ -134,6 +134,12 @@ public void onProcessingTime(InternalTimer<Row, Object> timer) throws Exception @Override public PythonFunctionRunner createPythonFunctionRunner() throws Exception { + if (getConfig().getConfig().containsKey("loopback.server.address")) { Review comment: This is a little tricky. Is it possible to avoid this? ########## File path: docs/content/docs/dev/python/debugging.md ########## @@ -52,6 +52,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## Debugging Python UDFs + +### Local Debug + +You can debug your python functions directly in PyCharm. + +**Note:** Currently, if your job take use of config `python-archives` and the parallelism of the job is bigger than `1`, you can only use [remote debug](#remote-debug) mode. Review comment: ```suggestion **Note:** Currently, if you use `python-archives` in the job and the parallelism of the job is greater than `1`, you can only use [remote debug](#remote-debug) mode. ``` ########## File path: flink-python/pyflink/datastream/stream_execution_environment.py ########## @@ -920,6 +922,30 @@ def _from_collection(self, elements: List[Any], def _generate_stream_graph(self, clear_transformations: bool = False, job_name: str = None) \ -> JavaObject: + # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster + j_configuration = get_j_env_configuration(self._j_stream_execution_environment) + if not self._remote_mode and is_local_deployment(j_configuration): + from pyflink.common import Configuration + from pyflink.fn_execution.beam.beam_worker_pool_service import \ + BeamFnLoopbackWorkerPoolServicer + + jvm = get_gateway().jvm + env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ + .getEnvironmentConfig(self._j_stream_execution_environment) + parallelism = self.get_parallelism() + if parallelism > 1 and env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()): + import logging + logging.warning("Currently in MiniCluster mode, if you use the API " Review comment: What about paragraph it as following: ``` Lookback mode is disabled as python archives are used and the parallelism of the job is greater than 1. The Python user-defined functions will be executed in an independent Python process. ``` ########## File path: flink-python/pyflink/table/table_environment.py ########## @@ -1743,6 +1744,26 @@ def _before_execute(self): classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() self._add_jars_to_j_env_config(jars_key) self._add_jars_to_j_env_config(classpaths_key) + # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster + if not self._remote_mode and \ + is_local_deployment(get_j_env_configuration(self._get_j_env())): + j_config = self.get_config().get_configuration() Review comment: It may happens that the parallelism is configured in flink-conf.yaml -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org