Hi Mark,

Thanks for the details! With that in mind I can also make the operation
more transactional by using wait and notify (won't solve the duplication on
restart issue).

I tried the example code and I got an NPE inside commit(). I initially
thought this was because I was running version 1.11.4. I  upgraded to NiFi
1.13.0 and the problem still persists where repoRecord is Null on line 786
in StandardProcessSession.java

Stack Trace:

> 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7]
> c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910]
> Failure FF: null: java.lang.NullPointerException
> java.lang.NullPointerException: null
> at
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
>
> ... My code calling outSession.comit()
>

This may be my issue since my code doesn't look exactly like the example,
but it would be good to have a better exception than NPE.

Thanks,
Eric


On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <[email protected]> wrote:

> Eric,
>
> The session holds onto all ‘active flowfiles’ as well as other information
> about them, such as which relationship they’ve been transferred to, which
> attributes have been added, removed, etc. So if you have a separate session
> for each outbound FlowFile, each time that session is committed, all those
> FlowFiles can be removed from the session and placed in the next queue.
> This is important because the session holds all FlowFiles it knows about in
> memory, but the queues will swap the flowfiles out - writing them to disk
> and then dropping them from memory/heap. This allows us to hold many
> millions of FlowFiles within NiFi at a time. However, if you’re trying to
> hold a million FlowFiles in heap, you’re going to use a huge amount of
> heap. This is predominantly due to the HashMap that is used to store
> attributes. Those can quickly use up a huge amount of heap.
>
> Thanks
> -Mark
>
>
> > On Mar 8, 2021, at 12:53 PM, Eric Secules <[email protected]> wrote:
> >
> > Hi Mark,
> >
> > Thanks for the example code and the considerations. I see the error I
> made
> > before.
> > If I were to use the standard way and put this all in one session what
> are
> > the memory characteristics? What is tracked in memory for a session and
> is
> > this actually any different (in overall memory use) from my approach of
> > using a separate session for output?
> >
> > Thanks,
> > Eric
> >
> > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote:
> >
> >> Hi Eric,
> >>
> >> We should definitely throw a better Exception there, rather than
> >> NullPointerException. But what you’re looking to do can be done. It’s
> >> slightly more nuanced than that, though. What you’ll need to do, in
> order
> >> to maintain the lineage, is to create the child FlowFile in the session
> >> that owns the parent. You can then “migrate” the child FlowFile to a new
> >> session. Something to the effect of:
> >>
> >> -----------------
> >> ProcessSession inSession = sessionFactory.create();
> >> FlowFile flowFile = inSession.get();
> >> if (flowFile == null) {
> >>  return;
> >> }
> >>
> >> ProcessSession outSession = sessionFactory.create();
> >>
> >> try (InputStream in = inSession.read(flowFile)) {
> >>  Record record;
> >>  while ((record = getRecord(in)) {
> >>      FlowFile child = inSession.create(flowFile);
> >>      try (OutputStream out = inSession.write(child)) {
> >>                // write some data.
> >>      }
> >>
> >>      inSession.migrate(outSession, Collections.singleton(child));
> >>      outSession.transfer(child, REL_SUCCESS);
> >>      outSession.commit();
> >>  }
> >> }
> >>
> >> inSession.transfer(flowFIle, REL_ORIGINAL);
> >> inSession.commit();
> >> ———————
> >>
> >> Some things to keep in mind, though:
> >> - To get a session factory, you should extend
> >> AbstractSessionFactoryProcessor instead of AbstractProcessor.
> >> - This means you need to ensure that you always explicitly
> commit/rollback
> >> sessions and handle Exceptions properly, ensure that you account for any
> >> FlowFile that is created, etc.
> >> - If your incoming FlowFile has 1 million FlowFiles, and you process
> >> 900,000 of them and then NiFi is restarted, then on restart it’ll
> reprocess
> >> the incoming FlowFile, so you’ll end up with 900,000 duplicates.
> >> - Performance will be very subpar, as you’ll be creating and committing
> up
> >> to 1 million sessions per FlowFile. Perhaps this is okay if you only
> have
> >> to process one of these files per 24 hours. But it’s worth considering.
> >>
> >> Thanks
> >> -Mark
> >>
> >>
> >>
> >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote:
> >>>
> >>> Hi Joe,
> >>>
> >>> I was able to get it working by using one session to manage the parent
> >>> flowfile and then one session per split file. I couldn't do
> >>> splitSession.create(inputFF), was getting an NPE and another exception,
> >> but
> >>> it worked with splitSession.create() the downside is I lose the lineage
> >>> connection to the parent.
> >>>
> >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
> >>>> c.m.p.p.MyProcessor
> MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
> >>>> Failure FF: null: java.lang.NullPointerException
> >>>> java.lang.NullPointerException: null
> >>>> at
> >>>>
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
> >>>> at
> >>>>
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
> >>>> at
> >>>>
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
> >>>>
> >>>
> >>> This log is from a 1.11.0 build of NiFi
> >>>
> >>> In my case the input is a proprietary text file with a gigantic schema
> >>> definition which we translate to JSON and split the results all at
> once.
> >> I
> >>> don't know whether record-based processing works for us because of how
> >>> fluid the json schema is.
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote:
> >>>
> >>>> Eric,
> >>>>
> >>>> My point is that it sounds like you get handed an original document
> >>>> which is a JSON document.  It contains up to a million elements within
> >>>> it.  You would implement a record reader for your original doc
> >>>> structure and then you can use any of our current writers/etc..  But
> >>>> the important part is avoiding creating splits unless/until totally
> >>>> necessary/etc..
> >>>>
> >>>> Anyway if you go the route you're thinking of I think you'll need a
> >>>> different session for reading (single session for the entire source
> >>>> file) and a different session for all the splits you'll create. But I
> >>>> might be over complicating that.  MarkP could give better input.
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]>
> wrote:
> >>>>>
> >>>>> Hi Joe,
> >>>>>
> >>>>> For my use case partial results are okay.
> >>>>> The files may contain up to a million records. But we have like a day
> >> to
> >>>>> process it. We will consider record-based processing. It might be a
> >>>> longer
> >>>>> task to convert our flows to consume records instead of single files.
> >>>>> Will I need to have multiple sessions to handle all this?
> >>>>>
> >>>>> Thanks,
> >>>>> Eric
> >>>>>
> >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote:
> >>>>>
> >>>>>> Eric
> >>>>>>
> >>>>>> The ProcessSession follows a unit of work pattern.  You can do a lot
> >>>>>> of things but until you commit the session it wont actually commit
> the
> >>>>>> change(s).  So if you want the behavior you describe call commit
> after
> >>>>>> transfer each time.  This is done automatically for you in most
> cases
> >>>>>> but you can call it to control the boundary.  Just remember you risk
> >>>>>> partial results then.  Consider you're reading the input file which
> >>>>>> contains 100 records lets say.  On record 51 there is a processing
> >>>>>> issue.  What happens then?    I'd also suggest this pattern
> generally
> >>>>>> results in poor performance.  Can you not use the record
> >>>>>> reader/writers to accomplish this so you can avoid turning it into a
> >>>>>> bunch of tiny flowfiles?
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]>
> >>>> wrote:
> >>>>>>>
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> I am trying to write a processor which parses an input file and
> >>>> emits one
> >>>>>>> JSON flowfile for each record in the input file. Currently we're
> >>>> calling
> >>>>>>> session.transfer() once we encounter a fragment we want to emit.
> But
> >>>> it's
> >>>>>>> not sending the new flowfiles to the next processor as it processes
> >>>> the
> >>>>>>> input flowfile. Instead it's holding everything until the input is
> >>>> fully
> >>>>>>> processed and releasing it all at once. Is there some way I can
> >>>> write the
> >>>>>>> processor to emit flowfiles as soon as possible rather than waiting
> >>>> for
> >>>>>>> everything to succeed?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Eric
> >>>>>>
> >>>>
> >>
> >>
>
>

Reply via email to