Good morning Joe,

Thank you for the feedback, and sorry for any confusion. I would like to
make the ConsumeKafka processor itself consume each message exactly once.
For flows whose destination is not another Kafka topic (so no
PublishKafka), e.g. for ingesting data into a data lake.

Thanks,

Le sam. 1 févr. 2025 à 17:47, Joe Witt <joe.w...@gmail.com> a écrit :

> Corentin
>
> Your posts here dive into solutions and combine different concepts which
> has made it a bit tougher to respond to.  I'd like to ensure I am following
> what your core interest is.  If I may restate it are you asking:
>
> "I'd like to take advantage of Kafka's Exactly Once Semantics mechanism
> while routing data through a flow in NiFi.  How can I do this?"
>
> The answer to that is to leverage ( ConsumeKafka* -> transform steps ->
> PublishKafka* ) inside a ProcessGroup set to run in a stateless mode and
> leveraging the proper kafka transaction settings.
>
> Thanks
>
> On Sat, Feb 1, 2025 at 9:09 AM Corentin Régent <corentin.reg...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am reaching back regarding this topic, as any insight would be
> > very helpful. Would it be possible to implement such processor state,
> which
> > updates would be atomic with that of the flowfile repository, in light of
> > the way that NiFi is currently architected? And if so, would you have any
> > hints on how it could be achieved?
> >
> > Thank you very much!
> >
> > Le dim. 26 janv. 2025 à 12:37, Corentin Régent <
> corentin.reg...@gmail.com>
> > a écrit :
> >
> > > Hi NiFi team,
> > >
> > > The ConsumeKafka
> > > <
> >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/stable/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html
> >
> > processor
> > > currently offers at-least-once semantics through asynchronous offset
> > > commits to Kafka, meaning that the data can be duplicated for example
> if
> > a
> > > cluster restart occurs before the offsets are acknowledged. One way to
> > > guarantee exactly-once semantics (assuming no data loss in NiFi from
> > > hardware failures) instead could be to persist some processor state
> > within
> > > NiFi, tracking the current offsets. But, this state management has to
> be
> > > atomic with operations performed on the flowfile repository, therefore
> > the
> > > StateManager
> > > <https://nifi.apache.org/nifi-docs/developer-guide.html#state_manager>
> > is
> > > unsuitable.
> > >
> > > I was wondering whether it would be feasible, in a reliable way, to
> > > retrieve such state from the latest flowfiles' attributes, presumably
> > from
> > > the provenance repository? Which I assume is atomic with / recovered
> from
> > > the flowfile repository? I am not familiar with Lucene; I may be
> looking
> > > for something similar to
> > > WriteAheadProvenanceRepository::getLatestCachedEvents
> > > <
> >
> https://github.com/apache/nifi/blob/main/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java#L260
> > >,
> > > but that would leverage up-to-date persisted information, rather than
> an
> > > in-memory map that is lost upon cluster restart.
> > >
> > > Thanks,
> > >
> >
>

Reply via email to