Corentin

Thanks for clarifying.  The notion of exactly once in effect exists within
a defined closed system meaning all aspects of transaction management are
designed/intentional for each component involved in that system.

In this case you need Kafka, NiFi, and whatever the target is all to
support some semantic/mechanism for it - for exactly once to be true.

You have two paths to explore in approximating the desired behavior with
NiFi acting as a broker between Kafka and the target data lake.

1. Run the flow using NiFi's normal stateful processing.  NiFi will pull
the data from Kafka and acknowledge the offset and now NiFi owns the
responsibility for ensuring the data gets to all desired destinations in
all desired forms.  This model does not offer exactly once semantics
because there isn't a point at which a confirmed single atomic transaction
commit occurs between the source and destination.  It will be essentially
at-least-once.

2. Run the flow in a process group with stateless execution mode.  This
would only acknowledge offset changes in Kafka once they're written to the
destination.  The way to reason over this stateless execution mode is to
consider it requires processors A->B->C.  Once C has executed it is
committed, then B is committed, then A is committed.  This is still for
this case most likely at-least-once because the scenario described here is
not a closed system whereby a single 'commit' renders the whole
'receipt/delivery' in effect.

In the past few years the excitement around 'exactly once' semantics has
led to a lot of misguided claims about how to achieve it and given the
impression you can make many things 'exactly once'.  Kafka has a pretty
legit exactly-once mechanism within it but that doesn't mean this extends
to every service it touches.  It is like saying a distributed transaction
with two-phase commit ensures exactly-once.  It is only true until you need
a three-phase commit.  And that is only safe until you need four... etc..

When I say closed system you need sources (like Kafka) and destination
protocols (like Snowflake's Snowpipe Streaming) which are designed with
sympathetic models to achieve exactly once.  There are many NiFi flows now
that do such things ensuring state/commits between source (Kafka) and
destination (Snowflake) use these mechanisms to offer very strong delivery
guarantees.

Thanks
Joe

On Sat, Feb 1, 2025 at 10:10 AM Corentin Régent <corentin.reg...@gmail.com>
wrote:

> Good morning Joe,
>
> Thank you for the feedback, and sorry for any confusion. I would like to
> make the ConsumeKafka processor itself consume each message exactly once.
> For flows whose destination is not another Kafka topic (so no
> PublishKafka), e.g. for ingesting data into a data lake.
>
> Thanks,
>
> Le sam. 1 févr. 2025 à 17:47, Joe Witt <joe.w...@gmail.com> a écrit :
>
> > Corentin
> >
> > Your posts here dive into solutions and combine different concepts which
> > has made it a bit tougher to respond to.  I'd like to ensure I am
> following
> > what your core interest is.  If I may restate it are you asking:
> >
> > "I'd like to take advantage of Kafka's Exactly Once Semantics mechanism
> > while routing data through a flow in NiFi.  How can I do this?"
> >
> > The answer to that is to leverage ( ConsumeKafka* -> transform steps ->
> > PublishKafka* ) inside a ProcessGroup set to run in a stateless mode and
> > leveraging the proper kafka transaction settings.
> >
> > Thanks
> >
> > On Sat, Feb 1, 2025 at 9:09 AM Corentin Régent <
> corentin.reg...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I am reaching back regarding this topic, as any insight would be
> > > very helpful. Would it be possible to implement such processor state,
> > which
> > > updates would be atomic with that of the flowfile repository, in light
> of
> > > the way that NiFi is currently architected? And if so, would you have
> any
> > > hints on how it could be achieved?
> > >
> > > Thank you very much!
> > >
> > > Le dim. 26 janv. 2025 à 12:37, Corentin Régent <
> > corentin.reg...@gmail.com>
> > > a écrit :
> > >
> > > > Hi NiFi team,
> > > >
> > > > The ConsumeKafka
> > > > <
> > >
> >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/stable/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/index.html
> > >
> > > processor
> > > > currently offers at-least-once semantics through asynchronous offset
> > > > commits to Kafka, meaning that the data can be duplicated for example
> > if
> > > a
> > > > cluster restart occurs before the offsets are acknowledged. One way
> to
> > > > guarantee exactly-once semantics (assuming no data loss in NiFi from
> > > > hardware failures) instead could be to persist some processor state
> > > within
> > > > NiFi, tracking the current offsets. But, this state management has to
> > be
> > > > atomic with operations performed on the flowfile repository,
> therefore
> > > the
> > > > StateManager
> > > > <
> https://nifi.apache.org/nifi-docs/developer-guide.html#state_manager>
> > > is
> > > > unsuitable.
> > > >
> > > > I was wondering whether it would be feasible, in a reliable way, to
> > > > retrieve such state from the latest flowfiles' attributes, presumably
> > > from
> > > > the provenance repository? Which I assume is atomic with / recovered
> > from
> > > > the flowfile repository? I am not familiar with Lucene; I may be
> > looking
> > > > for something similar to
> > > > WriteAheadProvenanceRepository::getLatestCachedEvents
> > > > <
> > >
> >
> https://github.com/apache/nifi/blob/main/nifi-framework-bundle/nifi-framework-extensions/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java#L260
> > > >,
> > > > but that would leverage up-to-date persisted information, rather than
> > an
> > > > in-memory map that is lost upon cluster restart.
> > > >
> > > > Thanks,
> > > >
> > >
> >
>

Reply via email to