Hi Flinkers, Wanted to check if anyone else has faced this issue before:
When Python UDF (which is used inside FlinkSQL) raises an exception, then metrics get lost and not reported. Facing this issue both in Flink 1.16.2 and FLink 1.17.1 (Python 3.9). If an exception is not raised, then metrics show up. It is not mentioned on Flink documentation that UDFs should not throw an exception. Is this the case? Or is it a known issue/bug? Thank you. ======================= FlinkSQL script content: ======================= CREATE TABLE input_table ( price DOUBLE ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE output_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING ALL); CREATE FUNCTION myDivide AS 'custom_udf.divide_udf' LANGUAGE PYTHON; -- Fail scenario: ZeroDivisionError: division by zero INSERT into output_table (select myDivide(value, 0) from input_table); ======================= Python UDF content: ======================= from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes import logging class DivideUDF(ScalarFunction): def __init__(self): self.success_counter = None self.fail_counter = None def open(self, function_context): self.success_counter = function_context.get_metric_group().counter("flinksql_custom_udf_success_metric") self.fail_counter = function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric") def eval(self, x, y): logging.info('executing custom udf with logging and metric example...') try: result = x/y self.success_counter.inc() return result except Exception as e: self.fail_counter.inc() raise e divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE()) ======================= Exception stack trace: ======================= 2023-07-26 18:17:20 org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: TimerException{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush} ... 15 more Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702) ... 14 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407) at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 3: Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute response = task() File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 340, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 580, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 995, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py", line 101, in process_element return self.func(value) File "<string>", line 1, in <lambda> File "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py", line 27, in eval raise e File "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py", line 22, in eval result = x/y ZeroDivisionError: float division by zero at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ... 3 more