gresockj commented on a change in pull request #5535:
URL: https://github.com/apache/nifi/pull/5535#discussion_r753162112



##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.hamcrest.MatcherAssert
+import org.junit.Assert
+import org.junit.Test
+
+import static groovy.json.JsonOutput.toJson
+import static org.hamcrest.CoreMatchers.equalTo
+import static org.hamcrest.CoreMatchers.is
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.Assert.assertThrows
+
+class GetElasticsearchTest {

Review comment:
       I'm going to include my obligatory suggestion to rewrite this as a Java 
test, but won't hold up the code review for it.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchError;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified 
by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written 
to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name 
property, if the property name is ignored.")

Review comment:
       What does the last clause mean: "if the property name is ignored"?  
Also, I wonder if this additional explanation is not necessary since you 
already specify `dependsOn` below.  Using the processor in NiFi, I was able to 
intuitively understand what was required based on the dependency.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -168,6 +186,21 @@ private void setupClient(final ConfigurationContext 
context) throws MalformedURL
         this.client = builder.build();
     }
 
+    private CredentialsProvider addCredentials(CredentialsProvider 
credentialsProvider, final AuthScope authScope, final String username, final 
String password) {

Review comment:
       Missed a `final` here

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -138,12 +140,21 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             input = session.putAllAttributes(input, attrs);
 
             session.transfer(input, REL_SUCCESS);
+        } catch (final ElasticsearchException ese) {
+            final String msg = String.format("Encountered a server-side 
problem with Elasticsearch. %s",
+                    ese.isElastic() ? "Moving to retry." : "Moving to 
failure");

Review comment:
       Perhaps "routing" instead of moving?

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The 
Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = 
"The error message provided by Elasticsearch if there is an error fetching the 
document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified 
by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written 
to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name 
property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the 
retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new 
Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new 
Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the 
specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+
+        final String id = 
context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = 
context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, 
getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();
+            if (FLOWFILE_CONTENT.getValue().equals(destination)) {
+                docFF = session.write(docFF, out -> 
out.write(json.getBytes()));
+            } else {
+                attributes.put(attributeName, json);
+            }
+
+            docFF = session.putAllAttributes(docFF, attributes);
+            session.getProvenanceReporter().receive(docFF, 
clientService.getTransitUrl(index, type), 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(docFF, REL_DOC);
+        } catch (final ElasticsearchException ese) {
+            if (ese.isNotFound()) {
+                if (input != null) {
+                    session.transfer(input, REL_NOT_FOUND);

Review comment:
       I'd suggest logging a warning here if input is null, otherwise the 
processor just appears to do nothing.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The 
Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = 
"The error message provided by Elasticsearch if there is an error fetching the 
document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {

Review comment:
       I'm going to recommend implementing `VerifiableProcessor`, though it 
would also be fine to create a separate JIRA to add this functionality to all 
the Elasticsearch processors.  See `ListS3` for a concrete example of what to 
verify and how the output should look.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "put", "index", "record"})
+@CapabilityDescription("Elasticsearch get processor that uses the official 
Elastic REST client libraries. " +
+        "Note that the full body of the document will be read into memory 
before being written to a FlowFile for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "elasticsearch.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "elasticsearch.type", description = "The 
Elasticsearch document type"),
+        @WritesAttribute(attribute = "elasticsearch.get.error", description = 
"The error message provided by Elasticsearch if there is an error fetching the 
document.")
+})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing.")
+public class GetElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
+    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
+            "flowfile-content",
+            "FlowFile Content",
+            "Output the retrieved document as the FlowFile content."
+    );
+
+    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
+            "flowfile-attribute",
+            "FlowFile Attribute",
+            "Output the retrieved document as a FlowFile attribute specified 
by the Attribute Name."
+    );
+
+    static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
+            .name("get-es-id")
+            .displayName("Document Id")
+            .description("The _id of the document to retrieve.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("get-es-destination")
+            .displayName("Destination")
+            .description("Indicates whether the retrieved document is written 
to the FlowFile content or a FlowFile attribute; " +
+                    "if using attribute, must specify the Attribute Name 
property, if the property name is ignored.")
+            .required(true)
+            .allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
+            .defaultValue(FLOWFILE_CONTENT.getValue())
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+            .name("get-es-attribute-name")
+            .displayName("Attribute Name")
+            .description("The name of the FlowFile attribute to use for the 
retrieved document output.")
+            .required(true)
+            .defaultValue("elasticsearch.doc")
+            .dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_DOC = new 
Relationship.Builder().name("document")
+            .description("Fetched documents are routed to this relationship.")
+            .build();
+
+    static final Relationship REL_NOT_FOUND = new 
Relationship.Builder().name("not_found")
+            .description("A FlowFile is routed to this relationship if the 
specified document does not exist in the Elasticsearch cluster.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
+    ));
+    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
+    )));
+
+    private volatile ElasticSearchClientService clientService;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+
+        final String id = 
context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+        final String type  = 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
+
+        final String destination = context.getProperty(DESTINATION).getValue();
+        final String attributeName = 
context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            final Map<String, Object> doc = clientService.get(index, type, id, 
getUrlQueryParameters(context, input));
+
+            final Map<String, String> attributes = new HashMap<>(4, 1);
+            attributes.put("filename", id);
+            attributes.put("elasticsearch.index", index);
+            if (type != null) {
+                attributes.put("elasticsearch.type", type);
+            }
+            final String json = mapper.writeValueAsString(doc);
+            FlowFile docFF = input != null ? input : session.create();

Review comment:
       Let's prefer spelling it out -- `documentFlowFile` or even `docFlowFile`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to