> AUTO_ACK setting means if the function runtime will ack messages or not. > ("function runtime" here specifically refers to the JavaInstanceRunnable. If > the ack happens inside a sink's implemented write method, it's not auto-ack). The description of the official website document is:Whether or not the framework acknowledges messages automatically. For users, sink is also part of the function framework.
Thanks, Baodi Shi > 2022年5月10日 09:1407,Baozi <wudixiaolong...@icloud.com.INVALID> 写道: > > Thanks for this detailed discussion about processing guarantee and ack. > These two settings are together affecting the behavior of a running function. > > One thing I want to clarify is: > AUTO_ACK setting means if the function runtime will ack messages or not. > ("function runtime" here specifically refers to the JavaInstanceRunnable. If > the ack happens inside a sink's implemented write method, it's not auto-ack). > > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages. > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation. > > Now with this context, let's review your two scenarios: > >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false. > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually > left to the implemented Sink to decide which semantics it is. It can be > ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE. > >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == >> false > This behavior is actually correct based on our previous context. > > A real problematic scenario here is when USER sets > ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the > JavaInstanceRunnable can ack for use under these cases. So there should be > some check to ban user submit function with such configs. > > > > On 2022/05/09 09:02:12 Baozi wrote: >> Hi, guys: >> >> I found out that autoAck configuration in function framework now affects >> Delivery semantics, and make it difficult for users to understand. Refer to >> the following two scenarios. >> >> 1. If the user understands that the semantics of Guarantees shall prevail >> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the >> processing semantics of the actual Function will become ATLEAST_ONCE. Refer >> to the following code, this scenario will not immediately ack. >> >> JavaInstanceRunnable#run():Line273 >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276 >> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == >> org.apache.pulsar.functions >> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { >> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck >> == true to auto ack >> currentRecord.ack(); >> } >> } >> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == >> false >> >> According to the following code, the framework is still automatically acked. >> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276 >> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325 >> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>> >> >> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> >> record) { >> msg.sendAsync() >> .thenAccept(messageId -> record.ack()) >> .exceptionally(getPublishErrorHandler(record, true)); >> } >> >> To sum up, users may be confused when configuring Guarantees and autoAck, >> and cannot judge their correct expected behavior. >> >> I would like to discuss whether it is possible to cancel the autoAck >> configuration and add a CUSTOM type for Guarantees. >> >> switch (processingGuarantees) { >> case Guarantees.ATMOST_ONCE: After the framework consumes the message, >> it immediately acks >> case Guarantees.ATLEAST_ONCE: After processing on the source side, >> perform ack again >> case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, >> perform ack again >> case Guarantees.CUSTOM: The function framework does not help users with >> any ack operations and semantic guarantees >> } >> >> If you have any ideas, welcome to discuss. If everyone agrees with this >> idea, I will mention a PIP to promote implementation. >> >> Thanks, >> Baodi Shi