Hi, Let us consider only Kafka and NiFi. Offset acknowledgements by NiFi currently are asynchronous, meaning that a message can be read and persisted by NiFi, but the offset synchonization with Kafka may not occur if there is a cluster restart for example. Therefore, the same message is then read again, indeed being at-least-once.
While a single atomic transaction may not occur between NiFi and Kafka, stateful NiFi is itself built upon atomic transactions in its repositories. So, if we assume no disk corruption, one may theoretically use them, instead of Kafka, for tracking the offsets? This would guarantee that each message is consumed only once by NiFi. And from there, delivery without duplication, from NiFi to the destination, could be achieved via idempotent operations (e.g. overwriting to a file system with a unique filename). Hence, is there a way one could implement such atomic state management, maybe leveraging the NiFi repositories? For the KafkaConsumer processor to keep track of the offsets all by itself, and to recover them after a cluster restart. Le sam. 1 févr. 2025, 18:35, Joe Witt <joe.w...@gmail.com> a écrit : > Corentin > > Thanks for clarifying. The notion of exactly once in effect exists within > a defined closed system meaning all aspects of transaction management are > designed/intentional for each component involved in that system. > > In this case you need Kafka, NiFi, and whatever the target is all to > support some semantic/mechanism for it - for exactly once to be true. > > You have two paths to explore in approximating the desired behavior with > NiFi acting as a broker between Kafka and the target data lake. > > 1. Run the flow using NiFi's normal stateful processing. NiFi will pull > the data from Kafka and acknowledge the offset and now NiFi owns the > responsibility for ensuring the data gets to all desired destinations in > all desired forms. This model does not offer exactly once semantics > because there isn't a point at which a confirmed single atomic transaction > commit occurs between the source and destination. It will be essentially > at-least-once. > > 2. Run the flow in a process group with stateless execution mode. This > would only acknowledge offset changes in Kafka once they're written to the > destination. The way to reason over this stateless execution mode is to > consider it requires processors A->B->C. Once C has executed it is > committed, then B is committed, then A is committed. This is still for > this case most likely at-least-once because the scenario described here is > not a closed system whereby a single 'commit' renders the whole > 'receipt/delivery' in effect. > > In the past few years the excitement around 'exactly once' semantics has > led to a lot of misguided claims about how to achieve it and given the > impression you can make many things 'exactly once'. Kafka has a pretty > legit exactly-once mechanism within it but that doesn't mean this extends > to every service it touches. It is like saying a distributed transaction > with two-phase commit ensures exactly-once. It is only true until you need > a three-phase commit. And that is only safe until you need four... etc.. > > When I say closed system you need sources (like Kafka) and destination > protocols (like Snowflake's Snowpipe Streaming) which are designed with > sympathetic models to achieve exactly once. There are many NiFi flows now > that do such things ensuring state/commits between source (Kafka) and > destination (Snowflake) use these mechanisms to offer very strong delivery > guarantees. > > Thanks > Joe > > On Sat, Feb 1, 2025 at 10:10 AM Corentin Régent <corentin.reg...@gmail.com > > > wrote: > > > Good morning Joe, > > > > Thank you for the feedback, and sorry for any confusion. I would like to > > make the ConsumeKafka processor itself consume each message exactly once. > > For flows whose destination is not another Kafka topic (so no > > PublishKafka), e.g. for ingesting data into a data lake. > > > > Thanks, > > > > Le sam. 1 févr. 2025 à 17:47, Joe Witt <joe.w...@gmail.com> a écrit : > > > > > Corentin > > > > > > Your posts here dive into solutions and combine different concepts > which > > > has made it a bit tougher to respond to. I'd like to ensure I am > > following > > > what your core interest is. If I may restate it are you asking: > > > > > > "I'd like to take advantage of Kafka's Exactly Once Semantics mechanism > > > while routing data through a flow in NiFi. How can I do this?" > > > > > > The answer to that is to leverage ( ConsumeKafka* -> transform steps -> > > > PublishKafka* ) inside a ProcessGroup set to run in a stateless mode > and > > > leveraging the proper kafka transaction settings. > > > > > > Thanks > > > > > > On Sat, Feb 1, 2025 at 9:09 AM Corentin Régent < > > corentin.reg...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I am reaching back regarding this topic, as any insight would be > > > > very helpful. Would it be possible to implement such processor state, > > > which > > > > updates would be atomic with that of the flowfile repository, in > light > > of > > > > the way that NiFi is currently architected? And if so, would you have > > any > > > > hints on how it could be achieved? > > > > > > > > Thank you very much! > > > > > > > > Le dim. 26 janv. 2025 à 12:37, Corentin Régent < > > > corentin.reg...@gmail.com> > > > > a écrit : > > > > > > > > > Hi NiFi team, > > > > > > > > > > The ConsumeKafka > > > > > < > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/stable/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html > > > > > > > > processor > > > > > currently offers at-least-once semantics through asynchronous > offset > > > > > commits to Kafka, meaning that the data can be duplicated for > example > > > if > > > > a > > > > > cluster restart occurs before the offsets are acknowledged. One way > > to > > > > > guarantee exactly-once semantics (assuming no data loss in NiFi > from > > > > > hardware failures) instead could be to persist some processor state > > > > within > > > > > NiFi, tracking the current offsets. But, this state management has > to > > > be > > > > > atomic with operations performed on the flowfile repository, > > therefore > > > > the > > > > > StateManager > > > > > < > > https://nifi.apache.org/nifi-docs/developer-guide.html#state_manager> > > > > is > > > > > unsuitable. > > > > > > > > > > I was wondering whether it would be feasible, in a reliable way, to > > > > > retrieve such state from the latest flowfiles' attributes, > presumably > > > > from > > > > > the provenance repository? Which I assume is atomic with / > recovered > > > from > > > > > the flowfile repository? I am not familiar with Lucene; I may be > > > looking > > > > > for something similar to > > > > > WriteAheadProvenanceRepository::getLatestCachedEvents > > > > > < > > > > > > > > > > https://github.com/apache/nifi/blob/main/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java#L260 > > > > >, > > > > > but that would leverage up-to-date persisted information, rather > than > > > an > > > > > in-memory map that is lost upon cluster restart. > > > > > > > > > > Thanks, > > > > > > > > > > > > > > >