Thanks for the feedback, Koji - I appreciate it. I will submit a Jira for the below and implement an alternate solution for the potential race condition.
Thanks! Chris Lundeberg On Mon, Aug 19, 2019 at 8:16 PM Koji Kawamura <[email protected]> wrote: > Hi Chris, > > You are correct, Wait processor has to rely on an attribute within a > FlowFile to determine target signal count. > I think the idea of making Wait be able to fetch target signal count > from DistributedMapCache is a nice improvement. > > Please create a JIRA for further discussion. I guess we will need to > add a property such as "Fetch Target Signal Count from Cache Service", > boolean, defaults to false. If enabled, Wait processor treats the > configured "Target Signal Count" value as a key in the > DistributedMapCache, then fetch the value to use as a target count. In > case the key is not found, the Wait processor transfer the FlowFile to > wait relationship. > https://issues.apache.org/jira/projects/NIFI > > Adding FetchDistributedMapCache right before Wait provides the same > result. But if Wait processor can fetch it, we can reduce the number > of fetch operation required to process multiple FlowFiles at Wait. > > To avoid the race condition that Wait processes FlowFiles before the > counting part finishes, I'd use two keys at the counting part. > Temporary one to accumulate the count, and the final one (the signal > identifier), once the counting finished. > > Thanks, > Koji > > On Tue, Aug 20, 2019 at 1:08 AM Chris Lundeberg <[email protected]> > wrote: > > > > Hi all, > > > > I wanted to throw out a question to the larger community before I went > down > > a different path. I might be looking at this wrong or making > assumptions I > > shouldn't. > > > > Recently I started working with the Wait and Notify processors a bit > more. > > I have a new flow which is a bit more batch in nature and these > processors > > seem to work nicely for being able to intelligently wait for chunks or > > files to be processed, before moving on to the next step. I have one > > specific pattern that I haven't solved with the inbuilt functionality, > > which is: > > > > 1. I have an incoming zip file from SFTP. That zip contains n-number of > > files within and each of those files need to be split in some way. I > won't > > know the number of files within the zip. > > > > 2. After they have been split correctly, a few transformations run on > each > > of the files. > > > > 3. At the end of the transformation process, these various files will be > > merged into 5 specific outbound file formats, to be sent to an outbound > > SFTP server. *Note*: I am not splitting and merging the same files back > > together (I have looked at the fragment index stuff). > > > > I found a nice solution for being able to count the number of flowfiles > > after the split, so I know exactly how many files should be transformed > and > > thus I know what my "Target Signal Count" should be within the Wait > > processor. At the moment I have a counting process to (1) Fetch > > Distributed MapCache, (2) Replace text (incrementing the count number > from > > the fetch, if a number is found), and (3) Put Distributed MapCache. This > > process works as expected and I have a valid key/value pair in the > MapCache > > for that particular process (I create a BachID so its very specific for > > each pull from the SFTP processor). The only way I know how to > > intelligently provide that information back to the Wait processor is to > > pull that value with a Fetch Distributed MapCache right before the > flowfile > > enters the Wait processor. In theory each flowfile waiting would have > the > > same attribute from the Fetch process and each attribute would be the > same > > count. However this doesn't always work because there could exist a > > condition where the transformations happen before the counting has been > > done and published to the MapCache Sever. So in this scenario you end up > > with some flowfiles having a lower count than others or just not having > the > > "true" count. Now, I can put additional gates in place such as trying to > > slow down the flowfiles at specific sections to try and allow the > counting > > to be done first, but its not a perfect science. > > > > I thought ideally it would be good to allow the Wait processor to pull > > directly from the MapCache if I could provide the key it would need for a > > lookup, within the "Target Signal Count" field. It could use the signal > > coming from Notify to say "I have X number of Notify, for this signal" > and > > use the count value I have set in the MapCache to say "This is the total > > number of files I need to see from Notify, for that same signal". This > way, > > I could run the Wait processor every few seconds and the chances of > running > > into a miscount condition would be far less. Is there any way currently > > where this processor could pull directly from the cache, or does it have > to > > rely on an attribute within the flowfile itself? I think it's the > latter, > > but I want to make sure someone doesn't have a better idea. > > > > Sorry for the long message. Thanks! > > > > > > Chris Lundeberg >
