Hi Mark, The NPE Came after trying to retrieve the parent flowfile from the outSession's records map. The parent flowfile was not migrated to outSession. I don't think it's right to migrate the parent flowfile to the output session as well.
Thanks, Eric On Tue, Mar 9, 2021 at 10:02 AM Eric Secules <[email protected]> wrote: > Hi Mark, > > Thanks for the details! With that in mind I can also make the operation > more transactional by using wait and notify (won't solve the duplication on > restart issue). > > I tried the example code and I got an NPE inside commit(). I initially > thought this was because I was running version 1.11.4. I upgraded to NiFi > 1.13.0 and the problem still persists where repoRecord is Null on line 786 > in StandardProcessSession.java > > Stack Trace: > >> 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7] >> c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910] >> Failure FF: null: java.lang.NullPointerException >> java.lang.NullPointerException: null >> at >> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786) >> at >> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600) >> at >> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353) >> at >> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332) >> >> ... My code calling outSession.comit() >> > > This may be my issue since my code doesn't look exactly like the example, > but it would be good to have a better exception than NPE. > > Thanks, > Eric > > > On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <[email protected]> wrote: > >> Eric, >> >> The session holds onto all ‘active flowfiles’ as well as other >> information about them, such as which relationship they’ve been transferred >> to, which attributes have been added, removed, etc. So if you have a >> separate session for each outbound FlowFile, each time that session is >> committed, all those FlowFiles can be removed from the session and placed >> in the next queue. This is important because the session holds all >> FlowFiles it knows about in memory, but the queues will swap the flowfiles >> out - writing them to disk and then dropping them from memory/heap. This >> allows us to hold many millions of FlowFiles within NiFi at a time. >> However, if you’re trying to hold a million FlowFiles in heap, you’re going >> to use a huge amount of heap. This is predominantly due to the HashMap that >> is used to store attributes. Those can quickly use up a huge amount of heap. >> >> Thanks >> -Mark >> >> >> > On Mar 8, 2021, at 12:53 PM, Eric Secules <[email protected]> wrote: >> > >> > Hi Mark, >> > >> > Thanks for the example code and the considerations. I see the error I >> made >> > before. >> > If I were to use the standard way and put this all in one session what >> are >> > the memory characteristics? What is tracked in memory for a session and >> is >> > this actually any different (in overall memory use) from my approach of >> > using a separate session for output? >> > >> > Thanks, >> > Eric >> > >> > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote: >> > >> >> Hi Eric, >> >> >> >> We should definitely throw a better Exception there, rather than >> >> NullPointerException. But what you’re looking to do can be done. It’s >> >> slightly more nuanced than that, though. What you’ll need to do, in >> order >> >> to maintain the lineage, is to create the child FlowFile in the session >> >> that owns the parent. You can then “migrate” the child FlowFile to a >> new >> >> session. Something to the effect of: >> >> >> >> ----------------- >> >> ProcessSession inSession = sessionFactory.create(); >> >> FlowFile flowFile = inSession.get(); >> >> if (flowFile == null) { >> >> return; >> >> } >> >> >> >> ProcessSession outSession = sessionFactory.create(); >> >> >> >> try (InputStream in = inSession.read(flowFile)) { >> >> Record record; >> >> while ((record = getRecord(in)) { >> >> FlowFile child = inSession.create(flowFile); >> >> try (OutputStream out = inSession.write(child)) { >> >> // write some data. >> >> } >> >> >> >> inSession.migrate(outSession, Collections.singleton(child)); >> >> outSession.transfer(child, REL_SUCCESS); >> >> outSession.commit(); >> >> } >> >> } >> >> >> >> inSession.transfer(flowFIle, REL_ORIGINAL); >> >> inSession.commit(); >> >> ——————— >> >> >> >> Some things to keep in mind, though: >> >> - To get a session factory, you should extend >> >> AbstractSessionFactoryProcessor instead of AbstractProcessor. >> >> - This means you need to ensure that you always explicitly >> commit/rollback >> >> sessions and handle Exceptions properly, ensure that you account for >> any >> >> FlowFile that is created, etc. >> >> - If your incoming FlowFile has 1 million FlowFiles, and you process >> >> 900,000 of them and then NiFi is restarted, then on restart it’ll >> reprocess >> >> the incoming FlowFile, so you’ll end up with 900,000 duplicates. >> >> - Performance will be very subpar, as you’ll be creating and >> committing up >> >> to 1 million sessions per FlowFile. Perhaps this is okay if you only >> have >> >> to process one of these files per 24 hours. But it’s worth considering. >> >> >> >> Thanks >> >> -Mark >> >> >> >> >> >> >> >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote: >> >>> >> >>> Hi Joe, >> >>> >> >>> I was able to get it working by using one session to manage the parent >> >>> flowfile and then one session per split file. I couldn't do >> >>> splitSession.create(inputFF), was getting an NPE and another >> exception, >> >> but >> >>> it worked with splitSession.create() the downside is I lose the >> lineage >> >>> connection to the parent. >> >>> >> >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1] >> >>>> c.m.p.p.MyProcessor >> MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2] >> >>>> Failure FF: null: java.lang.NullPointerException >> >>>> java.lang.NullPointerException: null >> >>>> at >> >>>> >> >> >> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788) >> >>>> at >> >>>> >> >> >> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852) >> >>>> at >> >>>> >> >> >> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737) >> >>>> >> >>> >> >>> This log is from a 1.11.0 build of NiFi >> >>> >> >>> In my case the input is a proprietary text file with a gigantic schema >> >>> definition which we translate to JSON and split the results all at >> once. >> >> I >> >>> don't know whether record-based processing works for us because of how >> >>> fluid the json schema is. >> >>> >> >>> Thanks, >> >>> Eric >> >>> >> >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote: >> >>> >> >>>> Eric, >> >>>> >> >>>> My point is that it sounds like you get handed an original document >> >>>> which is a JSON document. It contains up to a million elements >> within >> >>>> it. You would implement a record reader for your original doc >> >>>> structure and then you can use any of our current writers/etc.. But >> >>>> the important part is avoiding creating splits unless/until totally >> >>>> necessary/etc.. >> >>>> >> >>>> Anyway if you go the route you're thinking of I think you'll need a >> >>>> different session for reading (single session for the entire source >> >>>> file) and a different session for all the splits you'll create. But I >> >>>> might be over complicating that. MarkP could give better input. >> >>>> >> >>>> Thanks >> >>>> >> >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> >> wrote: >> >>>>> >> >>>>> Hi Joe, >> >>>>> >> >>>>> For my use case partial results are okay. >> >>>>> The files may contain up to a million records. But we have like a >> day >> >> to >> >>>>> process it. We will consider record-based processing. It might be a >> >>>> longer >> >>>>> task to convert our flows to consume records instead of single >> files. >> >>>>> Will I need to have multiple sessions to handle all this? >> >>>>> >> >>>>> Thanks, >> >>>>> Eric >> >>>>> >> >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> >> wrote: >> >>>>> >> >>>>>> Eric >> >>>>>> >> >>>>>> The ProcessSession follows a unit of work pattern. You can do a >> lot >> >>>>>> of things but until you commit the session it wont actually commit >> the >> >>>>>> change(s). So if you want the behavior you describe call commit >> after >> >>>>>> transfer each time. This is done automatically for you in most >> cases >> >>>>>> but you can call it to control the boundary. Just remember you >> risk >> >>>>>> partial results then. Consider you're reading the input file which >> >>>>>> contains 100 records lets say. On record 51 there is a processing >> >>>>>> issue. What happens then? I'd also suggest this pattern >> generally >> >>>>>> results in poor performance. Can you not use the record >> >>>>>> reader/writers to accomplish this so you can avoid turning it into >> a >> >>>>>> bunch of tiny flowfiles? >> >>>>>> >> >>>>>> Thanks >> >>>>>> >> >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]> >> >>>> wrote: >> >>>>>>> >> >>>>>>> Hello, >> >>>>>>> >> >>>>>>> I am trying to write a processor which parses an input file and >> >>>> emits one >> >>>>>>> JSON flowfile for each record in the input file. Currently we're >> >>>> calling >> >>>>>>> session.transfer() once we encounter a fragment we want to emit. >> But >> >>>> it's >> >>>>>>> not sending the new flowfiles to the next processor as it >> processes >> >>>> the >> >>>>>>> input flowfile. Instead it's holding everything until the input is >> >>>> fully >> >>>>>>> processed and releasing it all at once. Is there some way I can >> >>>> write the >> >>>>>>> processor to emit flowfiles as soon as possible rather than >> waiting >> >>>> for >> >>>>>>> everything to succeed? >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Eric >> >>>>>> >> >>>> >> >> >> >> >> >>
