Sam, Yes, that's right. The session provides a transaction. So each "bin" has its own session. This way, once a Bin is ready to be combined, you can do so in a single transaction/session and then the session is complete. No need to try to manage which bins contain which sessions or vice versa.
Thanks -Mark > On Aug 23, 2018, at 4:31 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 > <tldzpysami4wh...@gmail.com> wrote: > > Neat! So the BinFiles processor doesn't quite have the control over the > binning/pairing that I wanted, but it got me on the right track. I think > I've got a pretty lightweight custom processor that does what I'm looking > for now. > > Made a processor which extends AbstractSessionFactoryProcessor that has an > AtomicReference<ProcessSession> that gets initialized on its first > onTrigger, and a HashMap. Now, each OnTrigger creates a session as normal, > and migrates flowfiles to the ProcessSession the processor holds onto. It > then does the matching, migrates flowfiles back to the 'current' > ProcessSession, which then combines them, and transfers them on their merry > way. There's the potential for blowing the heap now, but I think I can put > some controls in place to manage that. > > Just sanity checking - is this a sane way to do this - moving things > between the 'current session', and a 'held session'? It looks like that's > pretty much how the MergeContent works; by having a session for each Bin. > So instead of having N bins/sessions, I've just got +1 Session that holds > onto the FlowFiles that I've keyed/seen? > > Thanks Mark! > > Cheers, > Sam > > On 2018/08/22 13:54:59, Mark Payne <m...@hotmail.com> wrote: >> Hi Sam,> >> >> There are a couple of ways to tackle this problem. My recommendation > would be to look at extending the BinFiles processor.> >> This is an abstract class, which MergeContent extends (and I think 1 or 2 > other processors?). Its job is to bin 'like flowfiles' together,> >> and it can take care of pulling data from queues and efficiently binning > the FlowFiles together. It is important, though, to keep in mind> >> that FlowFiles contain attribute maps, and those can quickly exhaust your > heap when you're trying to hold 10's or 100's of thousands> >> of FlowFiles in a single processor.> >> >> Thanks> >> -Mark> >> >> >> On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 < > tl...@gmail.com>> wrote:> >> >> Hi NiFi devs,> >> >> My understanding is that when I create a custom processor, and get > FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many > times the CustomFlowFileFilter returns > FlowFileFilterResult.ACCEPT_AND_CONTINUE or > FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at > most 20000 flowfiles, where 20000 is defined by the > nifi.queue.swap.threshold setting in nifi.properties.> >> (Disregarding that it's actually 19999, and that setting is not respected > when running tests, which made this SUPER confusing to debug...)> >> >> Attached a screenshot of that happening (also at: > https://i.imgur.com/25QJxuj.png)> >> >> My question is, Is there a way to force a custom processor to be able to > read ALL queued flowfiles in all incoming connections?> >> >> My particular use case is pairing flowfiles, and while there probably are > other ways to pair files using Wait/Notify processors, I'm handling files > in large throughput, with possible delays between the pairs arriving, and > it's quite easy to hit the limit. I could also increase the swap threshold > setting, but I keep hitting the problem. I've also played with custom > prioritizers on connections in an attempt to maximise the chance of having > pairs occur, but because I need to move unmatched flowfiles out, and back > in, is essentially creating a busy loop. Seems like there should be a > better way.> >> >> Any ideas?> >> >> Ideally, a way to force a custom processor to be able to read all queued > flowfiles (swapping more than the threshold into memory, during a single > OnTrigger call) would be the easiest solution. Is there one?> >> >> Cheers,> >> Sam> >> >> >> >> >>