[ 
https://issues.apache.org/jira/browse/NIFI-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15812529#comment-15812529
 ] 

Bryan Bende commented on NIFI-3216:
-----------------------------------

I was hoping to work this ticket, but not sure I will end up having time so I 
wanted to document the design I've been thinking of over the last week...

Originally I was thinking that each signal would be its own entry in the cache, 
and the wait would get all the keys and determine if the number of keys with 
the pattern equaled the signal count, and if so then release. The downside to 
this approach is that it could possibly require a very large number of cache 
entries. Consider the case were you SplitText on a 1 million row CSV and want 
to do something with the original CSV after something else has been done to all 
1 million rows are processed, that would require 1 million entries in the cache.

A better approach would probably be to use a single key (as it does now) and 
make the value of that key be a JSON document (or some structured format) that 
contains the count of signals for that key, as well as a map of metadata 
attributes. In order to do this we need to modify the DistributedMapCache to 
support a replace method that takes the key, the expected current value, and 
the new value to replace with. The cache server will perform the replace if the 
current value was the expected current value and return true, otherwise the 
replace won't be performed and false will be returned.

So the DistributedMapCacheClient would have methods like:
{code}
<K, V> boolean replace(K key, V currentValue, V replaceValue, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException;

 boolean isReplaceSupported();
{code}

The isReplaceSupported() can be used for a cache provider to indicate if this 
operation is not supported in the even that we eventually provide additional 
back-end caches, and the Wait/Notify processor could use this to perform custom 
validation to ensure the processor's are only valid if a cache with replacement 
capabilities is selected.

So lets say the signal key is "my.signal" and we are going to store the value 
as JSON like:
{code}
{
  "count" : 0,
  "attributes" : {
     "attr1" : "val1",
     "attr2" : "val2"
  }
}
{code}

When a signal hits the Notify processor it would try retrieve the value of 
"my.signal" from the cache. If nothing is found then it creates the first 
version of the JSON with count of 1 and any attributes (based on the regex) and 
stores it in the cache. If an entry was found then it would increment the count 
in the JSON and add in any attributes then call replace with the original JSON 
and the new JSON. If the replace fails it is because something else could have 
updated the cache between the original retrieval and the replace, so it would 
repeat the process (get the current value again and attempt replace).

One other thing to consider is the uniqueness of attributes (i.e. if there are 
1000 signals and they all have attribute 'foo'). I am proposing that when using 
these processors in this multi-signal mode we keep it simple and just merge 
together all the attributes and you would end up with the value of the 
attribute from the last signal.

> Add ability to wait for N signals to Wait/Notify processors
> -----------------------------------------------------------
>
>                 Key: NIFI-3216
>                 URL: https://issues.apache.org/jira/browse/NIFI-3216
>             Project: Apache NiFi
>          Issue Type: Improvement
>    Affects Versions: 1.2.0
>            Reporter: Bryan Bende
>
> The recently added Wait and Notify processors allow a flow file to be held at 
> the Wait processor until a signal is received in the Notify processor. It 
> would be nice to be able to wait for N signals before releasing.
> One way this could be done is to have a property like "Signal Count" on the 
> Wait processor, and then count the keys in the cache starting with some 
> pattern, and release when the # of keys equals the signal count.
> This would require the ability to get all the keys from the cache, or at 
> least get all keys matching a pattern: 
> https://issues.apache.org/jira/browse/NIFI-3214



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to