Elkhan Dadashov created FLINK-32728:
---------------------------------------
Summary: Metrics are not reported in Python UDF (used inside
FlinkSQL) when exception is raised
Key: FLINK-32728
URL: https://issues.apache.org/jira/browse/FLINK-32728
Project: Flink
Issue Type: New Feature
Components: API / Python, Table SQL / Runtime
Affects Versions: 1.17.1, 1.16.2
Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9)
Reporter: Elkhan Dadashov
When Python UDF (which is used inside FlinkSQL) raises an exception, then
metrics get lost and not reported. Facing this issue both in Flink 1.16.2 and
Flink 1.17.1 (Python 3.9).
If an exception is not raised, then metrics show up.
It is not mentioned on Flink documentation that UDFs should not throw an
exception.
=======================
FlinkSQL script content:
=======================
CREATE TABLE input_table (
price DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE output_table WITH ('connector' = 'print')
LIKE input_table (EXCLUDING ALL);
CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
LANGUAGE PYTHON;
-- Fail scenario: ZeroDivisionError: division by zero
INSERT into output_table (select myDivide(value, 0) from input_table);
=======================
Python UDF content:
=======================
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes
import logging
class DivideUDF(ScalarFunction):
def __init__(self):
self.success_counter = None
self.fail_counter = None
def open(self, function_context):
self.success_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
self.fail_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
def eval(self, x, y):
[logging.info|http://logging.info/]('executing custom udf with logging
and metric example...')
try:
result = x/y
self.success_counter.inc()
return result
except Exception as e:
self.fail_counter.inc()
raise e
divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
=======================
Exception stack trace:
=======================
2023-07-26 18:17:20
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception
while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException\{java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush}
... 15 more
Caused by: java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 14 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error received from SDK harness for instruction 3: Traceback (most recent call
last):
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
response = task()
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 340, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 580, in do_instruction
return getattr(self, request_type)(
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 618, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 995, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 221, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 346, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 348, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File
"/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
line 101, in process_element
return self.func(value)
File "<string>", line 1, in <lambda>
File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 27, in eval
raise e
File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 22, in eval
result = x/y
ZeroDivisionError: float division by zero
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)