Good morning Joe, Thank you for the feedback, and sorry for any confusion. I would like to make the ConsumeKafka processor itself consume each message exactly once. For flows whose destination is not another Kafka topic (so no PublishKafka), e.g. for ingesting data into a data lake.
Thanks, Le sam. 1 févr. 2025 à 17:47, Joe Witt <joe.w...@gmail.com> a écrit : > Corentin > > Your posts here dive into solutions and combine different concepts which > has made it a bit tougher to respond to. I'd like to ensure I am following > what your core interest is. If I may restate it are you asking: > > "I'd like to take advantage of Kafka's Exactly Once Semantics mechanism > while routing data through a flow in NiFi. How can I do this?" > > The answer to that is to leverage ( ConsumeKafka* -> transform steps -> > PublishKafka* ) inside a ProcessGroup set to run in a stateless mode and > leveraging the proper kafka transaction settings. > > Thanks > > On Sat, Feb 1, 2025 at 9:09 AM Corentin Régent <corentin.reg...@gmail.com> > wrote: > > > Hi, > > > > I am reaching back regarding this topic, as any insight would be > > very helpful. Would it be possible to implement such processor state, > which > > updates would be atomic with that of the flowfile repository, in light of > > the way that NiFi is currently architected? And if so, would you have any > > hints on how it could be achieved? > > > > Thank you very much! > > > > Le dim. 26 janv. 2025 à 12:37, Corentin Régent < > corentin.reg...@gmail.com> > > a écrit : > > > > > Hi NiFi team, > > > > > > The ConsumeKafka > > > < > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/stable/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html > > > > processor > > > currently offers at-least-once semantics through asynchronous offset > > > commits to Kafka, meaning that the data can be duplicated for example > if > > a > > > cluster restart occurs before the offsets are acknowledged. One way to > > > guarantee exactly-once semantics (assuming no data loss in NiFi from > > > hardware failures) instead could be to persist some processor state > > within > > > NiFi, tracking the current offsets. But, this state management has to > be > > > atomic with operations performed on the flowfile repository, therefore > > the > > > StateManager > > > <https://nifi.apache.org/nifi-docs/developer-guide.html#state_manager> > > is > > > unsuitable. > > > > > > I was wondering whether it would be feasible, in a reliable way, to > > > retrieve such state from the latest flowfiles' attributes, presumably > > from > > > the provenance repository? Which I assume is atomic with / recovered > from > > > the flowfile repository? I am not familiar with Lucene; I may be > looking > > > for something similar to > > > WriteAheadProvenanceRepository::getLatestCachedEvents > > > < > > > https://github.com/apache/nifi/blob/main/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java#L260 > > >, > > > but that would leverage up-to-date persisted information, rather than > an > > > in-memory map that is lost upon cluster restart. > > > > > > Thanks, > > > > > >