Regarding your question "why AUTO_ACK is designed this way" I think at the time when it's firstly implemented, the AUTO_ACK is just a convenient way to help user ack the message.
We can discuss the gap between expected behavior and actual behavior and try to resolve or simplify it. On 2022/05/10 01:14:07 Baozi wrote: > Thanks reply, > > > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages. > > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation. > > A little confused, I want to know why AUTO_ACK is designed this way. > > I'll give another example: > > > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages. > > > And if Guarantees != ATMOST_ONCE,then the JavaInstanceRunnable not will ack > message. > > > JavaInstanceRunnable#run():Line273 > > > Thanks, > Baodi Shi > > > 2022年5月10日 01:0009,Neng Lu <nl...@apache.org> 写道: > > > > Thanks for this detailed discussion about processing guarantee and ack. > > These two settings are together affecting the behavior of a running > > function. > > > > One thing I want to clarify is: > > AUTO_ACK setting means if the function runtime will ack messages or not. > > ("function runtime" here specifically refers to the JavaInstanceRunnable. > > If the ack happens inside a sink's implemented write method, it's not > > auto-ack). > > > > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages. > > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation. > > > > Now with this context, let's review your two scenarios: > > > >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false. > > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually > > left to the implemented Sink to decide which semantics it is. It can be > > ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE. > > > >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == > >> false > > This behavior is actually correct based on our previous context. > > > > A real problematic scenario here is when USER sets > > ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the > > JavaInstanceRunnable can ack for use under these cases. So there should be > > some check to ban user submit function with such configs. > > > > > > > > On 2022/05/09 09:02:12 Baozi wrote: > >> Hi, guys: > >> > >> I found out that autoAck configuration in function framework now affects > >> Delivery semantics, and make it difficult for users to understand. Refer > >> to the following two scenarios. > >> > >> 1. If the user understands that the semantics of Guarantees shall prevail > >> > >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the > >> processing semantics of the actual Function will become ATLEAST_ONCE. > >> Refer to the following code, this scenario will not immediately ack. > >> > >> JavaInstanceRunnable#run():Line273 > >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276> > >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == > >> org.apache.pulsar.functions > >> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { > >> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when > >> autoAck == true to auto ack > >> currentRecord.ack(); > >> } > >> } > >> > >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == > >> false > >> > >> According to the following code, the framework is still automatically > >> acked. > >> > >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 > >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276> > >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 > >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325> > >> > >> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> > >> record) { > >> msg.sendAsync() > >> .thenAccept(messageId -> record.ack()) > >> .exceptionally(getPublishErrorHandler(record, true)); > >> } > >> > >> To sum up, users may be confused when configuring Guarantees and autoAck, > >> and cannot judge their correct expected behavior. > >> > >> I would like to discuss whether it is possible to cancel the autoAck > >> configuration and add a CUSTOM type for Guarantees. > >> > >> switch (processingGuarantees) { > >> case Guarantees.ATMOST_ONCE: After the framework consumes the message, > >> it immediately acks > >> case Guarantees.ATLEAST_ONCE: After processing on the source side, > >> perform ack again > >> case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, > >> perform ack again > >> case Guarantees.CUSTOM: The function framework does not help users > >> with any ack operations and semantic guarantees > >> } > >> > >> If you have any ideas, welcome to discuss. If everyone agrees with this > >> idea, I will mention a PIP to promote implementation. > >> > >> Thanks, > >> Baodi Shi > >> > >> > >