This is an interesting idea to solve an admitted problem, but I wonder how it comports with the core tenets of Flow Based Programming on which NiFi is modeled. This seems to introduce globally-coupled dependencies between all queues in a flow, where another solution (flow segment-based resource allocation) might solve this problem without requiring per-queue contention on every cycle. I think with the stateless NiFi work there has been some discussion around being able to control the resource allocation for a flow segment. Sam Hjemfelt, any thoughts here?
Andy LoPresto [email protected] [email protected] PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On Oct 17, 2019, at 8:54 AM, Kessler, Jon <[email protected]> wrote: > > Joe, hopefully I addressed all of your questions: > > Your interpretation of the scheduling aspect is correct. These queues will > pretend to be empty a certain % of the time if higher priority data recently > moved elsewhere. That % is configurable on a per rule basis which allows the > operator to determine how much to stagger the data associated with each rule. > That % is also how the rules are ranked in terms of order of priority. The > higher the %, the more often a rule will make use of its threads so the > higher its priority is considered to be. > > Administration: You are correct that the ruleset is provided at the flow > controller level but will be leveraged by all connections regardless of > whether or not they use the BucketPrioritizer (more details on this below). > This overall solution only works if all FlowFileQueues are of this new > implementation which is why I tied it to nifi.properties settings. > > The sorting function here takes place on insertion into any connection on > which a BucketPrioritizer is set. Once a FlowFile has been sorted into a > bucket we maintain that state so that each time it moves into a new > connection we already know in which bucket it should be placed without > needing to have a BucketPrioritizer set on that connection. Each bucket in a > connection is just a FIFO queue so no additional sorting is done. You should > only have to configure connections to use the BucketPrioritizer at points in > your flow where you believe you'll have enough information to accurately > determine priority but not beyond that point unless you want to re-evaluate > downstream for some reason. There is administration involved in setting these > BucketPrioritizers on some connections but it should be minimal per flow > (sometimes as few as one). > > Some additional information: When you delete a rule the next time each > FlowFile moves that was already associated with that rule it will be > re-evaluated against the ruleset when it enters the next connection > regardless of whether or not a BucketPrioritizer was set on that connection. > Also FlowFiles that have yet to be evaluated (have yet to encounter a > BucketPrioritizer) will not be staggered. This was a design decision that if > we don't know what a priority is for a given FlowFile we should get it to > that point in the flow as soon as possible. This decision was a result of > emperical evidence that when we did stagger unevaluated data an incoming flow > of high priority data slowed its own upstream processing down once it was > identified and processed as high priority. > > Multi-tenancy: Agreed that a global priority list could be too restrictive > for multi-tenancy and should be addressed. > > Per swapping, this is an area where I admittedly need to put more thought > into my implementation because there is plenty of room for improvement. Right > now I'm just swapping files to disk in order of least to greatest priority > but they are all stored together. Therefore they're read back into memory in > order of least to greatest priority. More work should be done here. > > - Jon > ________________________________ > From: Joe Witt <[email protected] <mailto:[email protected]>> > Sent: Thursday, October 17, 2019 8:12:52 AM > To: [email protected] <mailto:[email protected]> > Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution > > Jon > > Probably some details I don't quite understand yet so responses here are to > get there... > > The concept for scheduling is interesting. Does this basically work around > the fact that we have an unfair scheduler so this has queue implementations > which pretend data is not available when it knows that there is higher > priority data available elsewhere thus returning more threads to the pool > faster to increase the likelihood that queues with higher priority data > will get served more often? > > The notion of prioritization implies there is a sorting function happening > somewhere. NiFi now does sorting on insertion to every queue. At what > points are you suggesting sorting can be done/reduced to? > > Administration: The existing model does require each prioritizer to be set > for each queue. Yours does as well - to opt into this you'd have to select > the BucketPrioritizer right? It seems like you're saying the priority > ruleset would be provided at the flow controller level and be enforced by > all connections which leverage this prioritizer. For large multi-tenant > nifi flows having a global ruleset might be too limiting but maybe we just > dont worry about that yet. > > How does this idea work with the fact that queues as the reach a given > threshold have their data swapped out to disk and as data gets worked off > the flowfiles get swapped back into memory? > > Thanks > Joe > > On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <[email protected]> wrote: > >> I want to start a discussion about a new prioritization mechanism that >> addresses some of the issues that I believe exist in the current solution. >> These issues are: >> >> - Scheduling: No consideration is given to data priority when determining >> which component is given the next available thread with which to work >> - Constant sorting: Because all flowfiles in a given connection share the >> same PriorityQueue they must be sorted every time they move. While this >> sort is efficient it can add up as queues grow deep. >> - Administration: There is a costly human element to managing the value >> used as a priority ranking as priorities change. You must also ensure every >> connection in the appropriate flow has the proper prioritizer assigned to >> it to make use of the property. >> >> We have developed a prototype of a new FlowFileQueue implementation that >> addresses these issues. Use of this implementation is controlled via >> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1 >> >> <https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1> >> so you can opt-in or out system-wide without doing a lot of >> configuration. Its design goals are: >> >> - Instead of using the value of a FlowFile attribute as a ranking, >> maintain a set of expression language rules to define your priorities. The >> highest ranked rule that a given FlowFile satisfies will be that FlowFile's >> priority >> - Because we have a finite set of priority rules we can utilize a bucket >> sort in our connections. One bucket per priority rule. The bucket/rule with >> which a FlowFile is associated with will be maintained so that as it moves >> through the system we do not have to re-evaluate that Flowfile against our >> ruleset unless we have reason to do so. >> - Control where in your flow FlowFiles are evaluated against the ruleset >> with a new Prioritizer implementation: BucketPrioritizer. >> - When this queue implementation is polled it will be able to check >> state to see if any data of a higher priority than what it currently >> contains recently (within 5s) moved elsewhere in the system. If higher >> priority data has recently moved elsewhere, the connection will only >> provide a FlowFile X% of the time where X is defined along with the rule. >> This allows higher priority data to have more frequent access to threads >> without thread-starving lower priority data. >> - Rules will be managed via a menu option for the flow and changes to >> them take effect instantly. This allows you to change your priorities >> without stopping/editing/restarting various components on the graph. >> >> I intend to contribute this solution but first want to solicit input and >> opinions. >> >> - Jon Kessler
