Hi Joe,

I was able to get it working by using one session to manage the parent
flowfile and then one session per split file. I couldn't do
splitSession.create(inputFF), was getting an NPE and another exception, but
it worked with splitSession.create() the downside is I lose the lineage
connection to the parent.

2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
> c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
> Failure FF: null: java.lang.NullPointerException
> java.lang.NullPointerException: null
> at
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
>

This log is from a 1.11.0 build of NiFi

In my case the input is a proprietary text file with a gigantic schema
definition which we translate to JSON and split the results all at once. I
don't know whether record-based processing works for us because of how
fluid the json schema is.

Thanks,
Eric

On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <joe.w...@gmail.com> wrote:

> Eric,
>
> My point is that it sounds like you get handed an original document
> which is a JSON document.  It contains up to a million elements within
> it.  You would implement a record reader for your original doc
> structure and then you can use any of our current writers/etc..  But
> the important part is avoiding creating splits unless/until totally
> necessary/etc..
>
> Anyway if you go the route you're thinking of I think you'll need a
> different session for reading (single session for the entire source
> file) and a different session for all the splits you'll create. But I
> might be over complicating that.  MarkP could give better input.
>
> Thanks
>
> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <esecu...@gmail.com> wrote:
> >
> > Hi Joe,
> >
> > For my use case partial results are okay.
> > The files may contain up to a million records. But we have like a day to
> > process it. We will consider record-based processing. It might be a
> longer
> > task to convert our flows to consume records instead of single files.
> > Will I need to have multiple sessions to handle all this?
> >
> > Thanks,
> > Eric
> >
> > On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <joe.w...@gmail.com> wrote:
> >
> > > Eric
> > >
> > > The ProcessSession follows a unit of work pattern.  You can do a lot
> > > of things but until you commit the session it wont actually commit the
> > > change(s).  So if you want the behavior you describe call commit after
> > > transfer each time.  This is done automatically for you in most cases
> > > but you can call it to control the boundary.  Just remember you risk
> > > partial results then.  Consider you're reading the input file which
> > > contains 100 records lets say.  On record 51 there is a processing
> > > issue.  What happens then?    I'd also suggest this pattern generally
> > > results in poor performance.  Can you not use the record
> > > reader/writers to accomplish this so you can avoid turning it into a
> > > bunch of tiny flowfiles?
> > >
> > > Thanks
> > >
> > > On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <esecu...@gmail.com>
> wrote:
> > > >
> > > > Hello,
> > > >
> > > > I am trying to write a processor which parses an input file and
> emits one
> > > > JSON flowfile for each record in the input file. Currently we're
> calling
> > > > session.transfer() once we encounter a fragment we want to emit. But
> it's
> > > > not sending the new flowfiles to the next processor as it processes
> the
> > > > input flowfile. Instead it's holding everything until the input is
> fully
> > > > processed and releasing it all at once. Is there some way I can
> write the
> > > > processor to emit flowfiles as soon as possible rather than waiting
> for
> > > > everything to succeed?
> > > >
> > > > Thanks,
> > > > Eric
> > >
>

Reply via email to