Hi Mark, Thanks for the details! With that in mind I can also make the operation more transactional by using wait and notify (won't solve the duplication on restart issue).
I tried the example code and I got an NPE inside commit(). I initially thought this was because I was running version 1.11.4. I upgraded to NiFi 1.13.0 and the problem still persists where repoRecord is Null on line 786 in StandardProcessSession.java Stack Trace: > 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7] > c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910] > Failure FF: null: java.lang.NullPointerException > java.lang.NullPointerException: null > at > org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786) > at > org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600) > at > org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353) > at > org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332) > > ... My code calling outSession.comit() > This may be my issue since my code doesn't look exactly like the example, but it would be good to have a better exception than NPE. Thanks, Eric On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <[email protected]> wrote: > Eric, > > The session holds onto all ‘active flowfiles’ as well as other information > about them, such as which relationship they’ve been transferred to, which > attributes have been added, removed, etc. So if you have a separate session > for each outbound FlowFile, each time that session is committed, all those > FlowFiles can be removed from the session and placed in the next queue. > This is important because the session holds all FlowFiles it knows about in > memory, but the queues will swap the flowfiles out - writing them to disk > and then dropping them from memory/heap. This allows us to hold many > millions of FlowFiles within NiFi at a time. However, if you’re trying to > hold a million FlowFiles in heap, you’re going to use a huge amount of > heap. This is predominantly due to the HashMap that is used to store > attributes. Those can quickly use up a huge amount of heap. > > Thanks > -Mark > > > > On Mar 8, 2021, at 12:53 PM, Eric Secules <[email protected]> wrote: > > > > Hi Mark, > > > > Thanks for the example code and the considerations. I see the error I > made > > before. > > If I were to use the standard way and put this all in one session what > are > > the memory characteristics? What is tracked in memory for a session and > is > > this actually any different (in overall memory use) from my approach of > > using a separate session for output? > > > > Thanks, > > Eric > > > > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote: > > > >> Hi Eric, > >> > >> We should definitely throw a better Exception there, rather than > >> NullPointerException. But what you’re looking to do can be done. It’s > >> slightly more nuanced than that, though. What you’ll need to do, in > order > >> to maintain the lineage, is to create the child FlowFile in the session > >> that owns the parent. You can then “migrate” the child FlowFile to a new > >> session. Something to the effect of: > >> > >> ----------------- > >> ProcessSession inSession = sessionFactory.create(); > >> FlowFile flowFile = inSession.get(); > >> if (flowFile == null) { > >> return; > >> } > >> > >> ProcessSession outSession = sessionFactory.create(); > >> > >> try (InputStream in = inSession.read(flowFile)) { > >> Record record; > >> while ((record = getRecord(in)) { > >> FlowFile child = inSession.create(flowFile); > >> try (OutputStream out = inSession.write(child)) { > >> // write some data. > >> } > >> > >> inSession.migrate(outSession, Collections.singleton(child)); > >> outSession.transfer(child, REL_SUCCESS); > >> outSession.commit(); > >> } > >> } > >> > >> inSession.transfer(flowFIle, REL_ORIGINAL); > >> inSession.commit(); > >> ——————— > >> > >> Some things to keep in mind, though: > >> - To get a session factory, you should extend > >> AbstractSessionFactoryProcessor instead of AbstractProcessor. > >> - This means you need to ensure that you always explicitly > commit/rollback > >> sessions and handle Exceptions properly, ensure that you account for any > >> FlowFile that is created, etc. > >> - If your incoming FlowFile has 1 million FlowFiles, and you process > >> 900,000 of them and then NiFi is restarted, then on restart it’ll > reprocess > >> the incoming FlowFile, so you’ll end up with 900,000 duplicates. > >> - Performance will be very subpar, as you’ll be creating and committing > up > >> to 1 million sessions per FlowFile. Perhaps this is okay if you only > have > >> to process one of these files per 24 hours. But it’s worth considering. > >> > >> Thanks > >> -Mark > >> > >> > >> > >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote: > >>> > >>> Hi Joe, > >>> > >>> I was able to get it working by using one session to manage the parent > >>> flowfile and then one session per split file. I couldn't do > >>> splitSession.create(inputFF), was getting an NPE and another exception, > >> but > >>> it worked with splitSession.create() the downside is I lose the lineage > >>> connection to the parent. > >>> > >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1] > >>>> c.m.p.p.MyProcessor > MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2] > >>>> Failure FF: null: java.lang.NullPointerException > >>>> java.lang.NullPointerException: null > >>>> at > >>>> > >> > org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788) > >>>> at > >>>> > >> > org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852) > >>>> at > >>>> > >> > org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737) > >>>> > >>> > >>> This log is from a 1.11.0 build of NiFi > >>> > >>> In my case the input is a proprietary text file with a gigantic schema > >>> definition which we translate to JSON and split the results all at > once. > >> I > >>> don't know whether record-based processing works for us because of how > >>> fluid the json schema is. > >>> > >>> Thanks, > >>> Eric > >>> > >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote: > >>> > >>>> Eric, > >>>> > >>>> My point is that it sounds like you get handed an original document > >>>> which is a JSON document. It contains up to a million elements within > >>>> it. You would implement a record reader for your original doc > >>>> structure and then you can use any of our current writers/etc.. But > >>>> the important part is avoiding creating splits unless/until totally > >>>> necessary/etc.. > >>>> > >>>> Anyway if you go the route you're thinking of I think you'll need a > >>>> different session for reading (single session for the entire source > >>>> file) and a different session for all the splits you'll create. But I > >>>> might be over complicating that. MarkP could give better input. > >>>> > >>>> Thanks > >>>> > >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> > wrote: > >>>>> > >>>>> Hi Joe, > >>>>> > >>>>> For my use case partial results are okay. > >>>>> The files may contain up to a million records. But we have like a day > >> to > >>>>> process it. We will consider record-based processing. It might be a > >>>> longer > >>>>> task to convert our flows to consume records instead of single files. > >>>>> Will I need to have multiple sessions to handle all this? > >>>>> > >>>>> Thanks, > >>>>> Eric > >>>>> > >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote: > >>>>> > >>>>>> Eric > >>>>>> > >>>>>> The ProcessSession follows a unit of work pattern. You can do a lot > >>>>>> of things but until you commit the session it wont actually commit > the > >>>>>> change(s). So if you want the behavior you describe call commit > after > >>>>>> transfer each time. This is done automatically for you in most > cases > >>>>>> but you can call it to control the boundary. Just remember you risk > >>>>>> partial results then. Consider you're reading the input file which > >>>>>> contains 100 records lets say. On record 51 there is a processing > >>>>>> issue. What happens then? I'd also suggest this pattern > generally > >>>>>> results in poor performance. Can you not use the record > >>>>>> reader/writers to accomplish this so you can avoid turning it into a > >>>>>> bunch of tiny flowfiles? > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]> > >>>> wrote: > >>>>>>> > >>>>>>> Hello, > >>>>>>> > >>>>>>> I am trying to write a processor which parses an input file and > >>>> emits one > >>>>>>> JSON flowfile for each record in the input file. Currently we're > >>>> calling > >>>>>>> session.transfer() once we encounter a fragment we want to emit. > But > >>>> it's > >>>>>>> not sending the new flowfiles to the next processor as it processes > >>>> the > >>>>>>> input flowfile. Instead it's holding everything until the input is > >>>> fully > >>>>>>> processed and releasing it all at once. Is there some way I can > >>>> write the > >>>>>>> processor to emit flowfiles as soon as possible rather than waiting > >>>> for > >>>>>>> everything to succeed? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Eric > >>>>>> > >>>> > >> > >> > >
