bgeng777 commented on code in PR #27438:
URL: https://github.com/apache/flink/pull/27438#discussion_r2756894889


##########
flink-python/pyflink/fn_execution/table/async_function/operations.py:
##########
@@ -0,0 +1,222 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import asyncio
+
+from pyflink.datastream import async_retry_strategies
+from pyflink.fn_execution.datastream.operations import Operation, 
AsyncOperationMixin
+from pyflink.fn_execution.datastream.process.async_function.operation import 
Emitter, \
+    AsyncFunctionRunner, ResultHandler, RetryableResultHandler
+from pyflink.fn_execution.datastream.process.async_function.queue import 
OrderedStreamElementQueue
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
+from pyflink.fn_execution.utils import operation_utils
+from pyflink.table import FunctionContext
+
+ASYNC_SCALAR_FUNCTION_URN = "flink:transform:async_scalar_function:v1"
+
+
+class AsyncScalarFunctionOperation(Operation, AsyncOperationMixin):
+    """
+    Operation for executing Python async scalar functions.
+
+    This operation implements true asynchronous execution by leveraging the 
async
+    infrastructure from DataStream API's AsyncOperation:
+    - AsyncFunctionRunner: Manages asyncio event loop in a separate thread
+    - Queue: Maintains in-flight async operations with configurable capacity
+    - Emitter: Collects and emits results asynchronously
+    - Non-blocking: Multiple async operations can be in-flight simultaneously
+
+    This provides high performance for I/O-bound async operations compared to
+    synchronous blocking execution.
+    """
+
+    def __init__(self, serialized_fn):
+        if serialized_fn.metric_enabled:
+            self.base_metric_group = GenericMetricGroup(None, None)
+        else:
+            self.base_metric_group = None
+
+        self._capacity = serialized_fn.async_options.max_concurrent_operations
+        self._timeout = serialized_fn.async_options.timeout_ms / 1000.0
+        self._retry_enabled = serialized_fn.async_options.retry_enabled
+        self._max_attempts = serialized_fn.async_options.retry_max_attempts
+        self._retry_delay = serialized_fn.async_options.retry_delay_ms / 1000.0
+
+        scalar_function, variable_dict, self.user_defined_funcs = \
+            operation_utils.extract_user_defined_function(
+                serialized_fn.udfs[0], one_arg_optimization=False)
+
+        # Create the eval function
+        self._eval_func = eval('lambda value: %s' % scalar_function, 
variable_dict)
+
+        # Create ordered queue to maintain result order
+        self._queue = OrderedStreamElementQueue(self._capacity, 
self._raise_exception_if_exists)
+
+        # Async execution components
+        self._async_function_runner = None
+        self._emitter = None
+        self._exception = None
+        self._output_processor = None
+
+        # Job parameters
+        self.job_parameters = {p.key: p.value for p in 
serialized_fn.job_parameters}

Review Comment:
   why  doesn't`job_parameters` field start with '_' as well



##########
flink-python/pyflink/table/tests/test_async_scalar_function.py:
##########
@@ -0,0 +1,285 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+import asyncio
+import uuid
+
+from pyflink.table import DataTypes
+from pyflink.table.udf import AsyncScalarFunction, udf, FunctionContext
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, 
run_with_config
+
+
+def generate_random_table_name():
+    return "Table{0}".format(str(uuid.uuid1()).replace("-", "_"))
+
+
+class AsyncScalarFunctionTests(PyFlinkStreamTableTestCase):
+    """
+    Integration tests for Python Async Scalar Function.
+    """
+
+    def test_basic_async_scalar_function(self):
+
+        class AsyncFunctionWithLifecycle(AsyncScalarFunction):
+            def open(self, function_context: FunctionContext):
+                self.prefix = "opened_"
+
+            async def eval(self, value):
+                await asyncio.sleep(0.001)
+                return self.prefix + value
+
+            def close(self):
+                pass
+
+        async_func = udf(
+            AsyncFunctionWithLifecycle(),
+            input_types=[DataTypes.STRING()],
+            result_type=DataTypes.STRING()
+        )
+
+        sink_table = generate_random_table_name()
+        self.t_env.execute_sql(f"""
+            CREATE TABLE {sink_table}(a STRING, b STRING)
+            WITH ('connector'='test-sink')
+        """)
+
+        t = self.t_env.from_elements([("test1",), ("test2",)], ['a'])
+        t.select(t.a, 
async_func(t.a).alias('b')).execute_insert(sink_table).wait()
+
+        actual = source_sink_utils.results()
+        self.assert_equals(actual, [
+            "+I[test1, opened_test1]",
+            "+I[test2, opened_test2]"
+        ])
+
+    def test_raise_exception_in_async_eval(self):
+        """Test async scalar function that raises exception during 
evaluation."""
+
+        class ExceptionAsyncFunction(AsyncScalarFunction):
+            async def eval(self, value: str) -> str:
+                raise ValueError("Test exception in async eval")
+
+        async_func = udf(
+            ExceptionAsyncFunction(),
+            input_types=[DataTypes.STRING()],
+            result_type=DataTypes.STRING()
+        )
+
+        sink_table = generate_random_table_name()
+        self.t_env.execute_sql(f"""
+            CREATE TABLE {sink_table}(a STRING, b STRING)
+            WITH ('connector'='test-sink')
+        """)
+
+        t = self.t_env.from_elements([("test1",)], ['a'])
+
+        with self.assertRaises(Exception) as context:
+            t.select(t.a, 
async_func(t.a).alias('b')).execute_insert(sink_table).wait()
+
+        # Verify exception message is propagated
+        self.assertIn("Test exception in async eval", str(context.exception))
+
+    def test_async_function_with_retry_logic(self):
+        """Test async scalar function with custom retry logic."""

Review Comment:
   How do we enable retry for this test? I don't find we have set some retry 
configs here



##########
flink-python/pyflink/table/udf.py:
##########
@@ -671,7 +782,7 @@ def udf(f: Union[Callable, ScalarFunction, Type] = None,
                      (default: general)
     :param udf_type: the type of the python function, available value: 
general, pandas,

Review Comment:
   It looks like we do not have `:param udf_type` any more in the input 
parameters, we can remove this doc line



##########
flink-python/pyflink/fn_execution/table/async_function/operations.py:
##########
@@ -0,0 +1,222 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import asyncio
+
+from pyflink.datastream import async_retry_strategies
+from pyflink.fn_execution.datastream.operations import Operation, 
AsyncOperationMixin
+from pyflink.fn_execution.datastream.process.async_function.operation import 
Emitter, \
+    AsyncFunctionRunner, ResultHandler, RetryableResultHandler
+from pyflink.fn_execution.datastream.process.async_function.queue import 
OrderedStreamElementQueue
+from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
+from pyflink.fn_execution.utils import operation_utils
+from pyflink.table import FunctionContext
+
+ASYNC_SCALAR_FUNCTION_URN = "flink:transform:async_scalar_function:v1"
+
+
+class AsyncScalarFunctionOperation(Operation, AsyncOperationMixin):
+    """
+    Operation for executing Python async scalar functions.
+
+    This operation implements true asynchronous execution by leveraging the 
async
+    infrastructure from DataStream API's AsyncOperation:
+    - AsyncFunctionRunner: Manages asyncio event loop in a separate thread
+    - Queue: Maintains in-flight async operations with configurable capacity
+    - Emitter: Collects and emits results asynchronously
+    - Non-blocking: Multiple async operations can be in-flight simultaneously
+
+    This provides high performance for I/O-bound async operations compared to
+    synchronous blocking execution.
+    """
+
+    def __init__(self, serialized_fn):
+        if serialized_fn.metric_enabled:
+            self.base_metric_group = GenericMetricGroup(None, None)
+        else:
+            self.base_metric_group = None
+
+        self._capacity = serialized_fn.async_options.max_concurrent_operations
+        self._timeout = serialized_fn.async_options.timeout_ms / 1000.0
+        self._retry_enabled = serialized_fn.async_options.retry_enabled
+        self._max_attempts = serialized_fn.async_options.retry_max_attempts
+        self._retry_delay = serialized_fn.async_options.retry_delay_ms / 1000.0
+
+        scalar_function, variable_dict, self.user_defined_funcs = \
+            operation_utils.extract_user_defined_function(
+                serialized_fn.udfs[0], one_arg_optimization=False)
+
+        # Create the eval function
+        self._eval_func = eval('lambda value: %s' % scalar_function, 
variable_dict)
+
+        # Create ordered queue to maintain result order
+        self._queue = OrderedStreamElementQueue(self._capacity, 
self._raise_exception_if_exists)
+
+        # Async execution components
+        self._async_function_runner = None
+        self._emitter = None
+        self._exception = None
+        self._output_processor = None
+
+        # Job parameters
+        self.job_parameters = {p.key: p.value for p in 
serialized_fn.job_parameters}
+
+    def set_output_processor(self, output_processor):
+        """Set the output processor for emitting results.
+
+        This method is called by FunctionOperation for AsyncOperationMixin 
implementations.
+        """
+        self._output_processor = output_processor
+
+    def open(self):
+        # Open user defined functions
+        for user_defined_func in self.user_defined_funcs:
+            if hasattr(user_defined_func, 'open'):
+                user_defined_func.open(
+                    FunctionContext(self.base_metric_group, 
self.job_parameters))
+
+        # Start emitter thread to collect async results
+        self._emitter = Emitter(self._mark_exception, self._output_processor, 
self._queue)
+        self._emitter.daemon = True
+        self._emitter.start()
+
+        # Start async function runner with event loop
+        self._async_function_runner = AsyncFunctionRunner()
+        self._async_function_runner.daemon = True
+        self._async_function_runner.start()
+        self._async_function_runner.wait_ready()
+
+    def close(self):
+        # Stop emitter
+        if self._emitter is not None:
+            self._emitter.stop()
+            self._emitter = None
+
+        # Stop async function runner
+        if self._async_function_runner is not None:
+            self._async_function_runner.stop()
+            self._async_function_runner = None
+
+        self._exception = None
+
+        # Close user defined functions
+        for user_defined_func in self.user_defined_funcs:
+            if hasattr(user_defined_func, 'close'):
+                user_defined_func.close()
+
+    def process_element(self, value):
+        """
+        Process an input element asynchronously.
+
+        This is non-blocking - it submits the async operation and returns 
immediately,
+        allowing multiple operations to be in-flight simultaneously.
+        """
+        self._raise_exception_if_exists()
+
+        entry = self._queue.put(None, 0, 0, value)
+
+        async def execute_async(result_handler):

Review Comment:
   line 149 directly uses `result_handler` as a variable name, I believe the 
codes are right but maybe renaming the input parameter here to a different name 
can make the codes more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to