Hi Flinkers,

Wanted to check if anyone else has faced this issue before:

When Python UDF (which is used inside FlinkSQL) raises an exception, then
metrics get lost and not reported.  Facing this issue both in Flink 1.16.2
and FLink 1.17.1 (Python 3.9).
If an exception is not raised, then metrics show up.

It is not mentioned on Flink documentation that UDFs should not throw an
exception. Is this the case?

Or is it a known issue/bug?

Thank you.

=======================
FlinkSQL script content:
=======================

CREATE TABLE input_table (
price DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);

CREATE TABLE output_table WITH ('connector' = 'print')
LIKE input_table (EXCLUDING ALL);

CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
LANGUAGE PYTHON;

-- Fail scenario: ZeroDivisionError: division by zero
INSERT into output_table (select myDivide(value, 0)  from input_table);

=======================
Python UDF content:
=======================

from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

import logging

class DivideUDF(ScalarFunction):
    def __init__(self):
        self.success_counter = None
        self.fail_counter = None
    def open(self, function_context):
        self.success_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
        self.fail_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
    def eval(self, x, y):
        logging.info('executing custom udf with logging and metric
example...')
        try:
            result = x/y
            self.success_counter.inc()
            return result
        except Exception as e:
            self.fail_counter.inc()
            raise e

divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())

=======================
Exception stack trace:
=======================

2023-07-26 18:17:20
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting
for BeamPythonFunctionRunner flush}
... 15 more
Caused by: java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 14 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
3: Traceback (most recent call last):
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
    response = task()
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 340, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 580, in do_instruction
    return getattr(self, request_type)(
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 995, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File
"/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
line 101, in process_element
    return self.func(value)
  File "<string>", line 1, in <lambda>
  File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 27, in eval
    raise e
  File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 22, in eval
    result = x/y
ZeroDivisionError: float division by zero

 at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more

Reply via email to