Jason Kania created FLINK-33486: ----------------------------------- Summary: Pulsar Client Send Timeout Terminates TaskManager Key: FLINK-33486 URL: https://issues.apache.org/jira/browse/FLINK-33486 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania
Currently, when the Pulsar Producer encounters a timeout when attempting to send data, it generates an unhandled TimeoutException. This is not a reasonable way to handle the timeout. The situation should be handled in a graceful way either through additional parameters that put control of the action under the discretion of the user or through some callback mechanism that the user can work with to write code. Unfortunately, fight now, this causes a termination of the task manager which then leads to other issues. Increasing the timeout period to avoid the issue is not really an option to ensure proper handling in the event that the situation does occur. The exception is as follows: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: persistent://public/default/myproducer-partition-0 at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send message to the topic persistent://public/default/myproducer-partition-0 within given timeout at org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-all-2.11.2.jar:2.11.2] ... 1 more -- This message was sent by Atlassian Jira (v8.20.10#820010)