Great points, Penghui. > To optimize, I think we can only let the producer connect to the broker > and the broker should tell the producer the backlog is exceeded.
In looking at the `CommandProducerSuccess` protobuf message, we already have the `producer_ready` boolean field. It was added for the exclusive producer case, and it seems to match this logic exactly, though the client wouldn't know "why" the producer was not ready. I think this field meets our needs because the producer just needs to know that it is connected and should not send messages yet. > If we have too many producers, try to reconnect to the broker again and > again. It is also a non-negligible cost. This is a really important point. The current protocol implementation leads to unnecessary network traffic, and it will be worse for topics with many producers. Note that the lookup part of the protocol introduces additional load on all of the brokers serving these requests. > Looks like we need to fix the client-side to make sure users will not get > ProducerBlockedQuotaExceededError when creating the producer with > producer_hold_request backlog policy. I have tested it locally, the behavior > can be confirmed Thanks for confirming this case. I think it would make sense to update the behavior on the producer_requests_hold feature so that the future is incomplete until the producer is ready to produce, just like the exclusive producer implementation. Ultimately, there are two cases this feature needs to handle. 1. A new producer connecting to a topic that has exceeded its quota. This case is trivial because the broker tells the producer it is connected but it cannot send any messages (i.e. producer_ready=false), and the client holds the producer's future until the broker sends producer_ready=true. 2. An existing producer gets disconnected due to an exceeded quota. In this case, it'd be easy enough for the producer to stop sending messages, but because the client application already has a reference to this producer, the application will be able to submit messages until the client's buffer is full, at which point the send is blocked or gets an exception. I think that would work, but it could be worth extending the client so that an application could reactively discover that the topic's quota is exceeded using a listener. Additionally, we could disable the send timeouts when the producer enters this "hold" state so that messages. In case number 2, it probably makes sense to extend the protocol so that the broker sends a protocol message indicating the producer should stop sending messages. This would be more elegant than disconnecting the producer and making it look up the topic again. Thanks, Michael On Fri, Aug 12, 2022 at 5:50 AM PengHui Li <peng...@apache.org> wrote: > > > The producer fails > the pending messages when the policy is producer_exception and the > producer does nothing when the policy is producer_request_hold > > Eventually, it will fail [0] the user's create producer request because of > the operation timeout [1]. > > > The primary advantage for this solution is that the broker does not > need to hold a producer's messages in memory for some undefined time. > > Yes, I agree. And changing the protocol will also affect other clients. > > To optimize, I think we can only let the producer connect to the broker > and the broker should tell the producer the backlog is exceeded. > The producer can only send one message to test. Only push out more messages > after the first message gets the response. Just a rough idea, not for now. > If we have too many producers, try to reconnect to the broker again and > again. > It is also a non-negligible cost. > > Looks like we need to fix the client-side to make sure users will not get > ProducerBlockedQuotaExceededError when creating the producer with > producer_hold_request backlog policy. I have tested it locally, the behavior > can be confirmed > > ``` > org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError: > {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$TopicBacklogQuotaExceededException: > Cannot create producer on topic with backlog quota > exceeded","reqId":1841236212888635356, "remote":"localhost/127.0.0.1:64805", > "local":"/127.0.0.1:64809"} > > at > org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1052) > at > org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88) > at > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.createProducer(BacklogQuotaManagerTest.java:664) > at > org.apache.pulsar.broker.service.BacklogQuotaManagerTest.testProducerException(BacklogQuotaManagerTest.java:1091) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132) > at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599) > at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174) > at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46) > at > org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822) > at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147) > at > org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146) > at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > at org.testng.TestRunner.privateRun(TestRunner.java:764) > at org.testng.TestRunner.run(TestRunner.java:585) > at org.testng.SuiteRunner.runTest(SuiteRunner.java:384) > at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378) > at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337) > at org.testng.SuiteRunner.run(SuiteRunner.java:286) > at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53) > at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96) > at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218) > at org.testng.TestNG.runSuitesLocally(TestNG.java:1140) > at org.testng.TestNG.runSuites(TestNG.java:1069) > at org.testng.TestNG.run(TestNG.java:1037) > at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66) > at > com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109) > ``` > > Best, > Penghui > > [0] > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1778-L1786 > [1] > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1773 > > > On Fri, Aug 12, 2022 at 1:36 PM Michael Marshall <mmarsh...@apache.org> > wrote: > > > > IMO, we should allow the producer to connect the topic. > > > > I actually think the current producer_request_hold feature works based > > on disconnecting a producer and only letting it connect when the topic > > no longer exceeds its quota. > > > > > It looks like we do not support the `producer_request_hold` semantics. > > We just easily use the same behaviour like `producer_exception`. Maybe it's > > a missing feature. > > > > I agree that the only references to the > > RetentionPolicy.producer_request_hold enum have to do with disallowing > > producer creation or with disconnecting the producer when the backlog > > is exceeded [0]. > > > > However, I think the feature does work if we look closer. The > > implementation is in the client (at least it is in the Java client). > > First, note that the only functional difference between > > producer_exception and producer_request_hold comes here [1] where two > > different exceptions are sent to the producer. Then, see that the > > producer handles these exceptions differently [2]. The producer fails > > the pending messages when the policy is producer_exception and the > > producer does nothing when the policy is producer_request_hold. In the > > second case, the producer will attempt to reconnect to the broker and > > will resend the messages that have been "held". > > > > It seems relevant to point out that the backlog quota's state is only > > changed on a 60 second interval by default (see > > backlogQuotaCheckIntervalInSeconds) and the default send timeout is 30 > > seconds. Therefore, many sends will likely timeout on the client side > > before the broker updates the topic's state to "writable" and lets the > > producer reconnect. To use this feature meaningfully, it might make > > sense to increase the send timeout. > > > > The primary advantage for this solution is that the broker does not > > need to hold a producer's messages in memory for some undefined time. > > > > I just checked, and we do not have this behavior documented in the > > pulsar binary protocol spec [3]. We should update the spec to indicate > > how this feature is supposed to work, assuming we keep it this way. > > > > Thanks, > > Michael > > > > [0] > > https://github.com/apache/pulsar/blob/d24d82780fd27a98c6cdbee28d756ee7d419495f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L104-L107 > > [1] > > https://github.com/apache/pulsar/blob/4c6989c4da6c0b18c9b0196630e03daf437cea68/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1383-L1391 > > [2] > > https://github.com/apache/pulsar/blob/955dcd10ce28b996811e194c9ad852b06ab30aee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1735-L1753 > > [3] https://pulsar.apache.org/docs/next/developing-binary-protocol/ > > > > > > On Thu, Aug 11, 2022 at 6:52 AM Qiang Zhao <mattisonc...@apache.org> > > wrote: > > > > > > Hi, Penghui > > > > > > I support your opinion. > > > > > > It looks like we do not support the `producer_request_hold` semantics. > > We just easily use the same behaviour like `producer_exception`. Maybe it's > > a missing feature. > > > > > > Best, > > > Mattison > > > > > > On 2022/08/11 05:28:25 PengHui Li wrote: > > > > Hi all, > > > > > > > > Pulsar has a backlog quota policy `producer_request_hold` which will > > hold > > > > the message > > > > publish request. It is very useful for some data sync scenarios. It > > looks > > > > like the producer > > > > waiting for the consumer to process the data. > > > > > > > > But the new producer is not allowed to connect to the topic after > > reaching > > > > the max backlog > > > > limitation with producer_request_hold policy. The producer will get > > > > `ProducerBlockedQuotaExceededError` error. This caused inconvenience to > > > > users, they need > > > > to have some retry to logic to try to create the producer again until > > the > > > > consumer acked more > > > > messages > > > > > > > > IMO, we should allow the producer to connect the topic. To keep the > > > > behavior consistent with > > > > the producer is already connected. > > > > > > > > WDYT? > > > > > > > > Best Regards, > > > > Penghui > > > > > >