Hi Joe, I was able to get it working by using one session to manage the parent flowfile and then one session per split file. I couldn't do splitSession.create(inputFF), was getting an NPE and another exception, but it worked with splitSession.create() the downside is I lose the lineage connection to the parent.
2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1] > c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2] > Failure FF: null: java.lang.NullPointerException > java.lang.NullPointerException: null > at > org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788) > at > org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852) > at > org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737) > This log is from a 1.11.0 build of NiFi In my case the input is a proprietary text file with a gigantic schema definition which we translate to JSON and split the results all at once. I don't know whether record-based processing works for us because of how fluid the json schema is. Thanks, Eric On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <joe.w...@gmail.com> wrote: > Eric, > > My point is that it sounds like you get handed an original document > which is a JSON document. It contains up to a million elements within > it. You would implement a record reader for your original doc > structure and then you can use any of our current writers/etc.. But > the important part is avoiding creating splits unless/until totally > necessary/etc.. > > Anyway if you go the route you're thinking of I think you'll need a > different session for reading (single session for the entire source > file) and a different session for all the splits you'll create. But I > might be over complicating that. MarkP could give better input. > > Thanks > > On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <esecu...@gmail.com> wrote: > > > > Hi Joe, > > > > For my use case partial results are okay. > > The files may contain up to a million records. But we have like a day to > > process it. We will consider record-based processing. It might be a > longer > > task to convert our flows to consume records instead of single files. > > Will I need to have multiple sessions to handle all this? > > > > Thanks, > > Eric > > > > On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <joe.w...@gmail.com> wrote: > > > > > Eric > > > > > > The ProcessSession follows a unit of work pattern. You can do a lot > > > of things but until you commit the session it wont actually commit the > > > change(s). So if you want the behavior you describe call commit after > > > transfer each time. This is done automatically for you in most cases > > > but you can call it to control the boundary. Just remember you risk > > > partial results then. Consider you're reading the input file which > > > contains 100 records lets say. On record 51 there is a processing > > > issue. What happens then? I'd also suggest this pattern generally > > > results in poor performance. Can you not use the record > > > reader/writers to accomplish this so you can avoid turning it into a > > > bunch of tiny flowfiles? > > > > > > Thanks > > > > > > On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <esecu...@gmail.com> > wrote: > > > > > > > > Hello, > > > > > > > > I am trying to write a processor which parses an input file and > emits one > > > > JSON flowfile for each record in the input file. Currently we're > calling > > > > session.transfer() once we encounter a fragment we want to emit. But > it's > > > > not sending the new flowfiles to the next processor as it processes > the > > > > input flowfile. Instead it's holding everything until the input is > fully > > > > processed and releasing it all at once. Is there some way I can > write the > > > > processor to emit flowfiles as soon as possible rather than waiting > for > > > > everything to succeed? > > > > > > > > Thanks, > > > > Eric > > > >