Hi Eric,

We should definitely throw a better Exception there, rather than 
NullPointerException. But what you’re looking to do can be done. It’s slightly 
more nuanced than that, though. What you’ll need to do, in order to maintain 
the lineage, is to create the child FlowFile in the session that owns the 
parent. You can then “migrate” the child FlowFile to a new session. Something 
to the effect of:

-----------------
ProcessSession inSession = sessionFactory.create();
FlowFile flowFile = inSession.get();
if (flowFile == null) {
  return;
}

ProcessSession outSession = sessionFactory.create();

try (InputStream in = inSession.read(flowFile)) {
  Record record;
  while ((record = getRecord(in)) {
      FlowFile child = inSession.create(flowFile);
      try (OutputStream out = inSession.write(child)) {
                // write some data.
      }

      inSession.migrate(outSession, Collections.singleton(child));
      outSession.transfer(child, REL_SUCCESS);
      outSession.commit();
  }
}

inSession.transfer(flowFIle, REL_ORIGINAL);
inSession.commit();
———————

Some things to keep in mind, though:
- To get a session factory, you should extend AbstractSessionFactoryProcessor 
instead of AbstractProcessor.
- This means you need to ensure that you always explicitly commit/rollback 
sessions and handle Exceptions properly, ensure that you account for any 
FlowFile that is created, etc.
- If your incoming FlowFile has 1 million FlowFiles, and you process 900,000 of 
them and then NiFi is restarted, then on restart it’ll reprocess the incoming 
FlowFile, so you’ll end up with 900,000 duplicates.
- Performance will be very subpar, as you’ll be creating and committing up to 1 
million sessions per FlowFile. Perhaps this is okay if you only have to process 
one of these files per 24 hours. But it’s worth considering.

Thanks
-Mark



> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote:
> 
> Hi Joe,
> 
> I was able to get it working by using one session to manage the parent
> flowfile and then one session per split file. I couldn't do
> splitSession.create(inputFF), was getting an NPE and another exception, but
> it worked with splitSession.create() the downside is I lose the lineage
> connection to the parent.
> 
> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
>> c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
>> Failure FF: null: java.lang.NullPointerException
>> java.lang.NullPointerException: null
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
>> 
> 
> This log is from a 1.11.0 build of NiFi
> 
> In my case the input is a proprietary text file with a gigantic schema
> definition which we translate to JSON and split the results all at once. I
> don't know whether record-based processing works for us because of how
> fluid the json schema is.
> 
> Thanks,
> Eric
> 
> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote:
> 
>> Eric,
>> 
>> My point is that it sounds like you get handed an original document
>> which is a JSON document.  It contains up to a million elements within
>> it.  You would implement a record reader for your original doc
>> structure and then you can use any of our current writers/etc..  But
>> the important part is avoiding creating splits unless/until totally
>> necessary/etc..
>> 
>> Anyway if you go the route you're thinking of I think you'll need a
>> different session for reading (single session for the entire source
>> file) and a different session for all the splits you'll create. But I
>> might be over complicating that.  MarkP could give better input.
>> 
>> Thanks
>> 
>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> wrote:
>>> 
>>> Hi Joe,
>>> 
>>> For my use case partial results are okay.
>>> The files may contain up to a million records. But we have like a day to
>>> process it. We will consider record-based processing. It might be a
>> longer
>>> task to convert our flows to consume records instead of single files.
>>> Will I need to have multiple sessions to handle all this?
>>> 
>>> Thanks,
>>> Eric
>>> 
>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote:
>>> 
>>>> Eric
>>>> 
>>>> The ProcessSession follows a unit of work pattern.  You can do a lot
>>>> of things but until you commit the session it wont actually commit the
>>>> change(s).  So if you want the behavior you describe call commit after
>>>> transfer each time.  This is done automatically for you in most cases
>>>> but you can call it to control the boundary.  Just remember you risk
>>>> partial results then.  Consider you're reading the input file which
>>>> contains 100 records lets say.  On record 51 there is a processing
>>>> issue.  What happens then?    I'd also suggest this pattern generally
>>>> results in poor performance.  Can you not use the record
>>>> reader/writers to accomplish this so you can avoid turning it into a
>>>> bunch of tiny flowfiles?
>>>> 
>>>> Thanks
>>>> 
>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]>
>> wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> I am trying to write a processor which parses an input file and
>> emits one
>>>>> JSON flowfile for each record in the input file. Currently we're
>> calling
>>>>> session.transfer() once we encounter a fragment we want to emit. But
>> it's
>>>>> not sending the new flowfiles to the next processor as it processes
>> the
>>>>> input flowfile. Instead it's holding everything until the input is
>> fully
>>>>> processed and releasing it all at once. Is there some way I can
>> write the
>>>>> processor to emit flowfiles as soon as possible rather than waiting
>> for
>>>>> everything to succeed?
>>>>> 
>>>>> Thanks,
>>>>> Eric
>>>> 
>> 

Reply via email to