Hi Mark,

The NPE Came after trying to retrieve the parent flowfile from the
outSession's records map. The parent flowfile was not migrated to
outSession. I don't think it's right to migrate the parent flowfile to the
output session as well.

Thanks,
Eric

On Tue, Mar 9, 2021 at 10:02 AM Eric Secules <[email protected]> wrote:

> Hi Mark,
>
> Thanks for the details! With that in mind I can also make the operation
> more transactional by using wait and notify (won't solve the duplication on
> restart issue).
>
> I tried the example code and I got an NPE inside commit(). I initially
> thought this was because I was running version 1.11.4. I  upgraded to NiFi
> 1.13.0 and the problem still persists where repoRecord is Null on line 786
> in StandardProcessSession.java
>
> Stack Trace:
>
>> 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7]
>> c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910]
>> Failure FF: null: java.lang.NullPointerException
>> java.lang.NullPointerException: null
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
>>
>> ... My code calling outSession.comit()
>>
>
> This may be my issue since my code doesn't look exactly like the example,
> but it would be good to have a better exception than NPE.
>
> Thanks,
> Eric
>
>
> On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <[email protected]> wrote:
>
>> Eric,
>>
>> The session holds onto all ‘active flowfiles’ as well as other
>> information about them, such as which relationship they’ve been transferred
>> to, which attributes have been added, removed, etc. So if you have a
>> separate session for each outbound FlowFile, each time that session is
>> committed, all those FlowFiles can be removed from the session and placed
>> in the next queue. This is important because the session holds all
>> FlowFiles it knows about in memory, but the queues will swap the flowfiles
>> out - writing them to disk and then dropping them from memory/heap. This
>> allows us to hold many millions of FlowFiles within NiFi at a time.
>> However, if you’re trying to hold a million FlowFiles in heap, you’re going
>> to use a huge amount of heap. This is predominantly due to the HashMap that
>> is used to store attributes. Those can quickly use up a huge amount of heap.
>>
>> Thanks
>> -Mark
>>
>>
>> > On Mar 8, 2021, at 12:53 PM, Eric Secules <[email protected]> wrote:
>> >
>> > Hi Mark,
>> >
>> > Thanks for the example code and the considerations. I see the error I
>> made
>> > before.
>> > If I were to use the standard way and put this all in one session what
>> are
>> > the memory characteristics? What is tracked in memory for a session and
>> is
>> > this actually any different (in overall memory use) from my approach of
>> > using a separate session for output?
>> >
>> > Thanks,
>> > Eric
>> >
>> > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote:
>> >
>> >> Hi Eric,
>> >>
>> >> We should definitely throw a better Exception there, rather than
>> >> NullPointerException. But what you’re looking to do can be done. It’s
>> >> slightly more nuanced than that, though. What you’ll need to do, in
>> order
>> >> to maintain the lineage, is to create the child FlowFile in the session
>> >> that owns the parent. You can then “migrate” the child FlowFile to a
>> new
>> >> session. Something to the effect of:
>> >>
>> >> -----------------
>> >> ProcessSession inSession = sessionFactory.create();
>> >> FlowFile flowFile = inSession.get();
>> >> if (flowFile == null) {
>> >>  return;
>> >> }
>> >>
>> >> ProcessSession outSession = sessionFactory.create();
>> >>
>> >> try (InputStream in = inSession.read(flowFile)) {
>> >>  Record record;
>> >>  while ((record = getRecord(in)) {
>> >>      FlowFile child = inSession.create(flowFile);
>> >>      try (OutputStream out = inSession.write(child)) {
>> >>                // write some data.
>> >>      }
>> >>
>> >>      inSession.migrate(outSession, Collections.singleton(child));
>> >>      outSession.transfer(child, REL_SUCCESS);
>> >>      outSession.commit();
>> >>  }
>> >> }
>> >>
>> >> inSession.transfer(flowFIle, REL_ORIGINAL);
>> >> inSession.commit();
>> >> ———————
>> >>
>> >> Some things to keep in mind, though:
>> >> - To get a session factory, you should extend
>> >> AbstractSessionFactoryProcessor instead of AbstractProcessor.
>> >> - This means you need to ensure that you always explicitly
>> commit/rollback
>> >> sessions and handle Exceptions properly, ensure that you account for
>> any
>> >> FlowFile that is created, etc.
>> >> - If your incoming FlowFile has 1 million FlowFiles, and you process
>> >> 900,000 of them and then NiFi is restarted, then on restart it’ll
>> reprocess
>> >> the incoming FlowFile, so you’ll end up with 900,000 duplicates.
>> >> - Performance will be very subpar, as you’ll be creating and
>> committing up
>> >> to 1 million sessions per FlowFile. Perhaps this is okay if you only
>> have
>> >> to process one of these files per 24 hours. But it’s worth considering.
>> >>
>> >> Thanks
>> >> -Mark
>> >>
>> >>
>> >>
>> >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote:
>> >>>
>> >>> Hi Joe,
>> >>>
>> >>> I was able to get it working by using one session to manage the parent
>> >>> flowfile and then one session per split file. I couldn't do
>> >>> splitSession.create(inputFF), was getting an NPE and another
>> exception,
>> >> but
>> >>> it worked with splitSession.create() the downside is I lose the
>> lineage
>> >>> connection to the parent.
>> >>>
>> >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
>> >>>> c.m.p.p.MyProcessor
>> MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
>> >>>> Failure FF: null: java.lang.NullPointerException
>> >>>> java.lang.NullPointerException: null
>> >>>> at
>> >>>>
>> >>
>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
>> >>>> at
>> >>>>
>> >>
>> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
>> >>>> at
>> >>>>
>> >>
>> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
>> >>>>
>> >>>
>> >>> This log is from a 1.11.0 build of NiFi
>> >>>
>> >>> In my case the input is a proprietary text file with a gigantic schema
>> >>> definition which we translate to JSON and split the results all at
>> once.
>> >> I
>> >>> don't know whether record-based processing works for us because of how
>> >>> fluid the json schema is.
>> >>>
>> >>> Thanks,
>> >>> Eric
>> >>>
>> >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote:
>> >>>
>> >>>> Eric,
>> >>>>
>> >>>> My point is that it sounds like you get handed an original document
>> >>>> which is a JSON document.  It contains up to a million elements
>> within
>> >>>> it.  You would implement a record reader for your original doc
>> >>>> structure and then you can use any of our current writers/etc..  But
>> >>>> the important part is avoiding creating splits unless/until totally
>> >>>> necessary/etc..
>> >>>>
>> >>>> Anyway if you go the route you're thinking of I think you'll need a
>> >>>> different session for reading (single session for the entire source
>> >>>> file) and a different session for all the splits you'll create. But I
>> >>>> might be over complicating that.  MarkP could give better input.
>> >>>>
>> >>>> Thanks
>> >>>>
>> >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]>
>> wrote:
>> >>>>>
>> >>>>> Hi Joe,
>> >>>>>
>> >>>>> For my use case partial results are okay.
>> >>>>> The files may contain up to a million records. But we have like a
>> day
>> >> to
>> >>>>> process it. We will consider record-based processing. It might be a
>> >>>> longer
>> >>>>> task to convert our flows to consume records instead of single
>> files.
>> >>>>> Will I need to have multiple sessions to handle all this?
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Eric
>> >>>>>
>> >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]>
>> wrote:
>> >>>>>
>> >>>>>> Eric
>> >>>>>>
>> >>>>>> The ProcessSession follows a unit of work pattern.  You can do a
>> lot
>> >>>>>> of things but until you commit the session it wont actually commit
>> the
>> >>>>>> change(s).  So if you want the behavior you describe call commit
>> after
>> >>>>>> transfer each time.  This is done automatically for you in most
>> cases
>> >>>>>> but you can call it to control the boundary.  Just remember you
>> risk
>> >>>>>> partial results then.  Consider you're reading the input file which
>> >>>>>> contains 100 records lets say.  On record 51 there is a processing
>> >>>>>> issue.  What happens then?    I'd also suggest this pattern
>> generally
>> >>>>>> results in poor performance.  Can you not use the record
>> >>>>>> reader/writers to accomplish this so you can avoid turning it into
>> a
>> >>>>>> bunch of tiny flowfiles?
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>>
>> >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]>
>> >>>> wrote:
>> >>>>>>>
>> >>>>>>> Hello,
>> >>>>>>>
>> >>>>>>> I am trying to write a processor which parses an input file and
>> >>>> emits one
>> >>>>>>> JSON flowfile for each record in the input file. Currently we're
>> >>>> calling
>> >>>>>>> session.transfer() once we encounter a fragment we want to emit.
>> But
>> >>>> it's
>> >>>>>>> not sending the new flowfiles to the next processor as it
>> processes
>> >>>> the
>> >>>>>>> input flowfile. Instead it's holding everything until the input is
>> >>>> fully
>> >>>>>>> processed and releasing it all at once. Is there some way I can
>> >>>> write the
>> >>>>>>> processor to emit flowfiles as soon as possible rather than
>> waiting
>> >>>> for
>> >>>>>>> everything to succeed?
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Eric
>> >>>>>>
>> >>>>
>> >>
>> >>
>>
>>

Reply via email to