Hi Mark, Thanks for the example code and the considerations. I see the error I made before. If I were to use the standard way and put this all in one session what are the memory characteristics? What is tracked in memory for a session and is this actually any different (in overall memory use) from my approach of using a separate session for output?
Thanks, Eric On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote: > Hi Eric, > > We should definitely throw a better Exception there, rather than > NullPointerException. But what you’re looking to do can be done. It’s > slightly more nuanced than that, though. What you’ll need to do, in order > to maintain the lineage, is to create the child FlowFile in the session > that owns the parent. You can then “migrate” the child FlowFile to a new > session. Something to the effect of: > > ----------------- > ProcessSession inSession = sessionFactory.create(); > FlowFile flowFile = inSession.get(); > if (flowFile == null) { > return; > } > > ProcessSession outSession = sessionFactory.create(); > > try (InputStream in = inSession.read(flowFile)) { > Record record; > while ((record = getRecord(in)) { > FlowFile child = inSession.create(flowFile); > try (OutputStream out = inSession.write(child)) { > // write some data. > } > > inSession.migrate(outSession, Collections.singleton(child)); > outSession.transfer(child, REL_SUCCESS); > outSession.commit(); > } > } > > inSession.transfer(flowFIle, REL_ORIGINAL); > inSession.commit(); > ——————— > > Some things to keep in mind, though: > - To get a session factory, you should extend > AbstractSessionFactoryProcessor instead of AbstractProcessor. > - This means you need to ensure that you always explicitly commit/rollback > sessions and handle Exceptions properly, ensure that you account for any > FlowFile that is created, etc. > - If your incoming FlowFile has 1 million FlowFiles, and you process > 900,000 of them and then NiFi is restarted, then on restart it’ll reprocess > the incoming FlowFile, so you’ll end up with 900,000 duplicates. > - Performance will be very subpar, as you’ll be creating and committing up > to 1 million sessions per FlowFile. Perhaps this is okay if you only have > to process one of these files per 24 hours. But it’s worth considering. > > Thanks > -Mark > > > > > On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote: > > > > Hi Joe, > > > > I was able to get it working by using one session to manage the parent > > flowfile and then one session per split file. I couldn't do > > splitSession.create(inputFF), was getting an NPE and another exception, > but > > it worked with splitSession.create() the downside is I lose the lineage > > connection to the parent. > > > > 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1] > >> c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2] > >> Failure FF: null: java.lang.NullPointerException > >> java.lang.NullPointerException: null > >> at > >> > org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788) > >> at > >> > org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852) > >> at > >> > org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737) > >> > > > > This log is from a 1.11.0 build of NiFi > > > > In my case the input is a proprietary text file with a gigantic schema > > definition which we translate to JSON and split the results all at once. > I > > don't know whether record-based processing works for us because of how > > fluid the json schema is. > > > > Thanks, > > Eric > > > > On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote: > > > >> Eric, > >> > >> My point is that it sounds like you get handed an original document > >> which is a JSON document. It contains up to a million elements within > >> it. You would implement a record reader for your original doc > >> structure and then you can use any of our current writers/etc.. But > >> the important part is avoiding creating splits unless/until totally > >> necessary/etc.. > >> > >> Anyway if you go the route you're thinking of I think you'll need a > >> different session for reading (single session for the entire source > >> file) and a different session for all the splits you'll create. But I > >> might be over complicating that. MarkP could give better input. > >> > >> Thanks > >> > >> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> wrote: > >>> > >>> Hi Joe, > >>> > >>> For my use case partial results are okay. > >>> The files may contain up to a million records. But we have like a day > to > >>> process it. We will consider record-based processing. It might be a > >> longer > >>> task to convert our flows to consume records instead of single files. > >>> Will I need to have multiple sessions to handle all this? > >>> > >>> Thanks, > >>> Eric > >>> > >>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote: > >>> > >>>> Eric > >>>> > >>>> The ProcessSession follows a unit of work pattern. You can do a lot > >>>> of things but until you commit the session it wont actually commit the > >>>> change(s). So if you want the behavior you describe call commit after > >>>> transfer each time. This is done automatically for you in most cases > >>>> but you can call it to control the boundary. Just remember you risk > >>>> partial results then. Consider you're reading the input file which > >>>> contains 100 records lets say. On record 51 there is a processing > >>>> issue. What happens then? I'd also suggest this pattern generally > >>>> results in poor performance. Can you not use the record > >>>> reader/writers to accomplish this so you can avoid turning it into a > >>>> bunch of tiny flowfiles? > >>>> > >>>> Thanks > >>>> > >>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]> > >> wrote: > >>>>> > >>>>> Hello, > >>>>> > >>>>> I am trying to write a processor which parses an input file and > >> emits one > >>>>> JSON flowfile for each record in the input file. Currently we're > >> calling > >>>>> session.transfer() once we encounter a fragment we want to emit. But > >> it's > >>>>> not sending the new flowfiles to the next processor as it processes > >> the > >>>>> input flowfile. Instead it's holding everything until the input is > >> fully > >>>>> processed and releasing it all at once. Is there some way I can > >> write the > >>>>> processor to emit flowfiles as soon as possible rather than waiting > >> for > >>>>> everything to succeed? > >>>>> > >>>>> Thanks, > >>>>> Eric > >>>> > >> > >
