Jason Kania created FLINK-33486:
-----------------------------------

             Summary: Pulsar Client Send Timeout Terminates TaskManager
                 Key: FLINK-33486
                 URL: https://issues.apache.org/jira/browse/FLINK-33486
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Pulsar
    Affects Versions: 1.17.1
            Reporter: Jason Kania


Currently, when the Pulsar Producer encounters a timeout when attempting to 
send data, it generates an unhandled TimeoutException. This is not a reasonable 
way to handle the timeout. The situation should be handled in a graceful way 
either through additional parameters that put control of the action under the 
discretion of the user or through some callback mechanism that the user can 
work with to write code. Unfortunately, fight now, this causes a termination of 
the task manager which then leads to other issues.

Increasing the timeout period to avoid the issue is not really an option to 
ensure proper handling in the event that the situation does occur.

The exception is as follows:

org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
persistent://public/default/myproducer-partition-0
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 
The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
message to the topic persistent://public/default/myproducer-partition-0 within 
given timeout
        at 
org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        ... 1 more

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to