markap14 commented on a change in pull request #5412:
URL: https://github.com/apache/nifi/pull/5412#discussion_r716934722



##########
File path: 
nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java
##########
@@ -0,0 +1,1202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.stateless;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceReference;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.search.SearchContext;
+import org.apache.nifi.search.SearchResult;
+import org.apache.nifi.search.Searchable;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterContextDefinition;
+import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
+import org.apache.nifi.stateless.config.ReportingTaskDefinition;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.DataflowTriggerContext;
+import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TransactionThresholds;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static 
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+import static org.apache.nifi.processor.util.StandardValidators.URL_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.createDirectoryExistsValidator;
+
+@Restricted
+@SupportsBatching
+@SystemResourceConsiderations({
+    @SystemResourceConsideration(resource= SystemResource.CPU),
+    @SystemResourceConsideration(resource= SystemResource.DISK),
+    @SystemResourceConsideration(resource= SystemResource.MEMORY),
+    @SystemResourceConsideration(resource= SystemResource.NETWORK)
+})
+@DynamicProperty(name="Any Parameter name", value="Any value", description = 
"Any dynamic property that is added will be provided to the stateless flow as a 
Parameter. The name of the property will" +
+    " be the name of the Parameter, and the value of the property will be the 
value of the Parameter. Because Parameter values may or may not be sensitive, 
all dynamic properties will be considered" +
+    " sensitive in order to protect their integrity.")
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Runs the configured dataflow using the Stateless NiFi 
engine. Please see documentation in order to understand the differences between 
the traditional NiFi runtime engine and" +
+    " the Stateless NiFi engine. If the Processor is configured with an 
incoming connection, the incoming FlowFiles will be queued up into the 
specified Input Port in the dataflow. Data that is" +
+    " transferred out of the flow via an Output Port will be sent to the 
'output' relationship, and an attribute will be added to indicate which Port 
that FlowFile was transferred to. See" +
+    " Additional Details for more information.")
+@WritesAttributes({
+    @WritesAttribute(attribute="output.port.name", description = "The name of 
the Output Port that the FlowFile was transferred to"),
+    @WritesAttribute(attribute="failure.port.name", description = "If one or 
more FlowFiles is routed to one of the Output Ports that is configured as a 
Failure Port, the input FlowFile (if any) " +
+        "will have this attribute added to it, indicating the name of the Port 
that caused the dataflow to be considered a failure.")
+})
+public class ExecuteStateless extends AbstractProcessor implements Searchable {
+    static final AllowableValue SPEC_FROM_FILE = new AllowableValue("Use Local 
File", "Use Local File or URL", "Dataflow to run is stored as a file on the 
NiFi server or at a URL that is accessible" +
+        " to the NiFi server");
+    static final AllowableValue SPEC_FROM_REGISTRY = new AllowableValue("Use 
NiFi Registry", "Use NiFi Registry", "Dataflow to run is stored in NiFi 
Registry");
+
+    static final AllowableValue CONTENT_STORAGE_HEAP = new 
AllowableValue("Store Content on Heap", "Store Content on Heap",
+        "The FlowFile content will be stored on the NiFi JVM's heap. This is 
the most " +
+        "efficient option for small FlowFiles but can quickly exhaust the heap 
with larger FlowFiles, resulting in Out Of Memory Errors and node 
instability.");
+    static final AllowableValue CONTENT_STORAGE_DISK = new 
AllowableValue("Store Content on Disk", "Store Content on Disk",
+        "The FlowFile content will be stored on disk, within the configured 
Work Directory. The content will still be cleared between invocations and will 
not be persisted across restarts.");
+
+    static final PropertyDescriptor DATAFLOW_SPECIFICATION_STRATEGY = new 
Builder()
+        .name("Dataflow Specification Strategy")
+        .displayName("Dataflow Specification Strategy")
+        .description("Specifies how the Processor should obtain a copy of the 
dataflow that it is to run")
+        .required(true)
+        .allowableValues(SPEC_FROM_FILE, SPEC_FROM_REGISTRY)
+        .defaultValue(SPEC_FROM_FILE.getValue())
+        .build();
+
+    static final PropertyDescriptor DATAFLOW_FILE = new Builder()
+        .name("Dataflow File")
+        .displayName("Dataflow File/URL")
+        .description("The filename or URL that specifies the dataflow that is 
to be run")
+        .required(true)
+        .identifiesExternalResource(ResourceCardinality.SINGLE, 
ResourceType.FILE, ResourceType.URL)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_FILE)
+        .build();
+
+    static final PropertyDescriptor REGISTRY_URL = new Builder()
+        .name("Registry URL")
+        .displayName("Registry URL")
+        .description("The URL of the NiFi Registry to retrieve the flow from")
+        .required(true)
+        .addValidator(URL_VALIDATOR)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .build();
+
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new Builder()
+        .name("Registry SSL Context Service")
+        .displayName("Registry SSL Context Service")
+        .description("The SSL Context Service to use for interacting with the 
NiFi Registry")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .build();
+
+    static final PropertyDescriptor COMMS_TIMEOUT = new Builder()
+        .name("Communications Timeout")
+        .displayName("Communications Timeout")
+        .description("Specifies how long to wait before timing out when 
attempting to communicate with NiFi Registry")
+        .required(true)
+        .addValidator(TIME_PERIOD_VALIDATOR)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .defaultValue("15 secs")
+        .build();
+
+    static final PropertyDescriptor BUCKET = new Builder()
+        .name("Registry Bucket")
+        .displayName("Registry Bucket")
+        .description("The name of the Bucket in the NiFi Registry that the 
flow should retrieved from")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .build();
+
+    static final PropertyDescriptor FLOW_NAME = new Builder()
+        .name("Flow Name")
+        .displayName("Flow Name")
+        .description("The name of the flow in the NiFi Registry")
+        .required(true)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .build();
+
+    static final PropertyDescriptor FLOW_VERSION = new Builder()
+        .name("Flow Version")
+        .displayName("Flow Version")
+        .description("The version of the flow in the NiFi Registry that should 
be retrieved. If not specified, the latest version will always be used.")
+        .required(false)
+        .addValidator(POSITIVE_INTEGER_VALIDATOR)
+        .dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, SPEC_FROM_REGISTRY)
+        .build();
+
+    static final PropertyDescriptor INPUT_PORT = new Builder()
+        .name("Input Port")
+        .displayName("Input Port")
+        .description("Specifies the name of the Input Port to send incoming 
FlowFiles to. This property is required if this processor has any incoming 
connections.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor FAILURE_PORTS = new Builder()
+        .name("Failure Ports")
+        .displayName("Failure Ports")
+        .description("A comma-separated list of the names of Output Ports that 
exist at the root level of the dataflow. If any FlowFile is routed to one of 
the Ports whose name is listed here, the " +
+            "dataflow will be considered a failure, and the incoming FlowFile 
(if any) will be routed to 'failure'. If not specified, all Output Ports will 
be considered successful.")
+        .required(false)
+        .addValidator(NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .build();
+
+    static final PropertyDescriptor DATAFLOW_TIMEOUT = new Builder()
+        .name("Dataflow Timeout")
+        .displayName("Dataflow Timeout")
+        .description("Specifies the maximum amount of time for an invocation 
of the stateless flow to complete. If the flow does not complete within this 
amount of time, the incoming FlowFile, if " +
+            "any, will be routed to the timeout relationship. In any case, the 
dataflow will be canceled and the invocation will end")
+        .required(true)
+        .addValidator(TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .defaultValue("60 sec")
+        .build();
+
+    static final PropertyDescriptor LIB_DIRECTORY = new Builder()
+        .name("NAR Directory")
+        .displayName("NAR Directory")
+        .description("The directory to retrieve NAR's from")
+        .required(true)
+        .addValidator(createDirectoryExistsValidator(false, false))
+        .defaultValue("./lib")
+        .build();
+
+    static final PropertyDescriptor WORKING_DIRECTORY = new Builder()
+        .name("Work Directory")
+        .displayName("Work Directory")
+        .description("A directory that can be used to create temporary files, 
such as expanding NAR files, temporary FlowFile content, caching the dataflow, 
etc.")
+        .required(true)
+        .addValidator(createDirectoryExistsValidator(false, true))
+        .defaultValue("./work")
+        .build();
+
+    static final PropertyDescriptor KRB5_CONF = new Builder()
+        .name("Krb5 Conf File")
+        .displayName("Krb5 Conf File")
+        .description("The KRB5 Conf file to use for configuring components 
that rely on Kerberos")
+        .required(false)
+        .identifiesExternalResource(ResourceCardinality.SINGLE, 
ResourceType.FILE)
+        .build();
+
+    static final PropertyDescriptor STATELESS_SSL_CONTEXT_SERVICE = new 
Builder()
+        .name("Stateless SSL Context Service")
+        .displayName("Stateless SSL Context Service")
+        .description("The SSL Context to use as the Stateless System SSL 
Context")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
+
+    static final PropertyDescriptor MAX_INGEST_FLOWFILES = new Builder()
+        .name("Max Ingest FlowFiles")
+        .displayName("Max Ingest FlowFiles")
+        .description("During the course of a stateless dataflow, some 
processors may require more data than they have available in order to proceed. 
For example, MergeContent may require a minimum " +
+            "number of FlowFiles before it can proceed. In this case, the 
dataflow may bring in additional data from its source Processor. However, this 
data may all be held in memory, so this " +
+            "property provides a mechanism for limiting the maximum number of 
FlowFiles that the source Processor can ingest before it will no longer be 
triggered to ingest additional data.")
+        .required(false)
+        .addValidator(POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .build();
+
+    static final PropertyDescriptor MAX_INGEST_DATA_SIZE = new Builder()
+        .name("Max Ingest Data Size")
+        .displayName("Max Ingest Data Size")
+        .description("During the course of a stateless dataflow, some 
processors may require more data than they have available in order to proceed. 
For example, MergeContent may require a minimum " +
+            "number of FlowFiles before it can proceed. In this case, the 
dataflow may bring in additional data from its source Processor. However, this 
data may all be held in memory, so this " +
+            "property provides a mechanism for limiting the maximum amount of 
data that the source Processor can ingest before it will no longer be triggered 
to ingest additional data.")
+        .required(false)
+        .addValidator(DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .build();
+
+    static final PropertyDescriptor CONTENT_STORAGE_STRATEGY = new Builder()
+        .name("Content Storage Strategy")
+        .displayName("Content Storage Strategy")
+        .description("Specifies where the content of FlowFiles that the 
Stateless dataflow is operating on should be stored. Note that the data is 
always considered temporal and may be deleted at " +
+            "any time. It is not intended to be persisted across restarted.")
+        .required(true)
+        .allowableValues(CONTENT_STORAGE_HEAP, CONTENT_STORAGE_DISK)
+        .defaultValue(CONTENT_STORAGE_DISK.getValue())
+        .build();
+
+    static final PropertyDescriptor MAX_INPUT_FLOWFILE_SIZE = new Builder()
+        .name("Max Input FlowFile Size")
+        .displayName("Max Input FlowFile Size")
+        .description("This Processor is configured to load all incoming 
FlowFiles into memory. Because of that, it is important to limit the maximum 
size of " +
+            "any incoming FlowFile that would get loaded into memory, in order 
to prevent Out Of Memory Errors and excessive Garbage Collection. Any FlowFile 
whose content " +
+            "size is greater than the configured size will be routed to 
failure and not sent to the Stateless Engine.")
+        .required(true)
+        .dependsOn(CONTENT_STORAGE_STRATEGY, CONTENT_STORAGE_HEAP)
+        .addValidator(DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .defaultValue("1 MB")
+        .build();
+
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("For any incoming FlowFile that is successfully 
processed, the original incoming FlowFile will be transferred to this 
Relationship")
+        .autoTerminateDefault(true)
+        .build();
+    static final Relationship REL_OUTPUT = new Relationship.Builder()
+        .name("output")
+        .description("Any FlowFiles that are transferred to an Output Port in 
the configured dataflow will be routed to this Relationship")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If the dataflow fails to process an incoming FlowFile, 
that FlowFile will be routed to this relationship")
+        .build();
+    static final Relationship REL_TIMEOUT = new Relationship.Builder()
+        .name("timeout")
+        .description("If the dataflow fails to complete in the configured 
amount of time, any incoming FlowFile will be routed to this relationship")
+        .build();
+
+
+    private final BlockingQueue<StatelessDataflow> dataflows = new 
LinkedBlockingDeque<>();
+    private final AtomicInteger dataflowCreationCount = new AtomicInteger(0);
+    private volatile Set<String> failurePortNames;
+    private volatile VersionedFlowSnapshot flowSnapshot;
+    private volatile AbortableTriggerContext triggerContext;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(
+            DATAFLOW_SPECIFICATION_STRATEGY,
+            DATAFLOW_FILE,
+            REGISTRY_URL,
+            SSL_CONTEXT_SERVICE,
+            COMMS_TIMEOUT,
+            BUCKET,
+            FLOW_NAME,
+            FLOW_VERSION,
+            INPUT_PORT,
+            FAILURE_PORTS,
+            CONTENT_STORAGE_STRATEGY,
+            MAX_INPUT_FLOWFILE_SIZE,
+            DATAFLOW_TIMEOUT,
+            LIB_DIRECTORY,
+            WORKING_DIRECTORY,
+            MAX_INGEST_FLOWFILES,
+            MAX_INGEST_DATA_SIZE,
+            STATELESS_SSL_CONTEXT_SERVICE,
+            KRB5_CONF);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(Arrays.asList(REL_ORIGINAL, REL_OUTPUT, 
REL_FAILURE, REL_TIMEOUT));
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new Builder()
+            .name(propertyDescriptorName)
+            .defaultValue("Value for the " + propertyDescriptorName + " 
parameter")
+            .addValidator(Validator.VALID)
+            .sensitive(true)
+            .dynamic(true)
+            .build();
+    }
+
+
+    @OnScheduled
+    public void parseDataflow(final ProcessContext context) throws IOException 
{
+        final String specificationStrategy = 
context.getProperty(DATAFLOW_SPECIFICATION_STRATEGY).getValue();
+
+        final DataflowRetrieval rawRetrieval;
+        if (specificationStrategy.equalsIgnoreCase(SPEC_FROM_FILE.getValue())) 
{
+            rawRetrieval = new FileSystemDataflowRetrieval();
+        } else {
+            rawRetrieval = new RegistryDataflowRetrieval(getLogger());
+        }
+
+        final DataflowRetrieval cachedRetrieval = new 
CachingDataflowRetrieval(getIdentifier(), getLogger(), rawRetrieval);
+
+        final long start = System.nanoTime();
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
cachedRetrieval.retrieveDataflowContents(context);
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+
+        getLogger().info("Successfully retrieved flow in {} millis", millis);

Review comment:
       I like the idea. But when the flow is saved via NiFi's  Download Flow 
Definition, what we get is basically an empty VersionedFlowSnapshot wrapper, 
with only the "flowContents" populated. So the name of the flow, etc., are not 
available.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to