gresockj commented on a change in pull request #4693:
URL: https://github.com/apache/nifi/pull/4693#discussion_r724054441
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -182,8 +181,8 @@ private Response runQuery(final String endpoint, final
String query, final Strin
final HttpEntity queryEntity = new NStringEntity(query,
ContentType.APPLICATION_JSON);
try {
- return client.performRequest("POST", sb.toString(),
requestParameters != null ? requestParameters : Collections.emptyMap(),
queryEntity);
- } catch (final Exception e) {
+ return performRequest("POST", sb.toString(), requestParameters,
queryEntity);
Review comment:
Consider `final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context)
{
type =
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();
- List<PropertyDescriptor> dynamic =
context.getProperties().entrySet().stream()
- .filter( e -> e.getKey().isDynamic())
- .map(e -> e.getKey())
+ List<PropertyDescriptor> dynamic =
context.getProperties().keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
- Map<String, RecordPath> _temp = new HashMap<>();
+ Map<String, RecordPath> temp = new HashMap<>();
for (PropertyDescriptor desc : dynamic) {
String value = context.getProperty(desc).getValue();
Review comment:
We could add some `final` modifiers here while we're at it.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -176,7 +174,7 @@ public PropertyDescriptor
getSupportedDynamicPropertyDescriptor(String name) {
}
}
- private void validateCoordinates(Map coordinates) throws
LookupFailureException {
+ private void validateCoordinates(Map<String, Object> coordinates) throws
LookupFailureException {
List<String> reasons = new ArrayList<>();
Review comment:
Might as well make this `final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor
implements ElasticsearchRestProcessor {
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("If the \"by query\" operation fails, and a flowfile was
read, it will be sent to this relationship.")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("If the \"by query\" operation succeeds, and a flowfile
was read, it will be sent to this relationship.")
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ private volatile ElasticSearchClientService clientService;
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ abstract String tookAttribute();
Review comment:
I think we should prefer a verb naming for these methods:
`getTookAttribute` and `getErrorAttribute`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
package org.apache.nifi.processors.elasticsearch;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
+@WritesAttributes({
+ @WritesAttribute(attribute = "elasticsearch.delete.took", description
= "The amount of time that it took to complete the delete operation in ms."),
+ @WritesAttribute(attribute = "elasticsearch.delete.error", description
= "The error message provided by Elasticsearch if there is an error running the
delete.")
+})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The
query can be loaded from a flowfile body " +
"or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
- @WritesAttribute(attribute = "elasticsearch.delete.took", description =
"The amount of time that it took to complete the delete operation in ms."),
- @WritesAttribute(attribute = "elasticsearch.delete.error", description =
"The error message provided by Elasticsearch if there is an error running the
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements
ElasticsearchRestProcessor {
- public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
- .description("If the delete by query fails, and a flowfile was read,
it will be sent to this relationship.").build();
-
- public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
- .description("If the delete by query succeeds, and a flowfile was
read, it will be sent to this relationship.")
- .build();
-
-
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing")
Review comment:
I think we should mention that these parameters will override any
matching parameters in the request body, which might be provided by other
non-dynamic properties.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -254,14 +271,16 @@
return DESCRIPTORS;
}
- private RecordReaderFactory readerFactory;
- private RecordPathCache recordPathCache;
- private ElasticSearchClientService clientService;
- private RecordSetWriterFactory writerFactory;
- private boolean logErrors;
- private volatile String dateFormat;
- private volatile String timeFormat;
- private volatile String timestampFormat;
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String
propertyDescriptorName) {
Review comment:
`final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+import java.util.Map;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "elasticsearch.update.took", description
= "The amount of time that it took to complete the update operation in ms."),
+ @WritesAttribute(attribute = "elasticsearch.update.error", description
= "The error message provided by Elasticsearch if there is an error running the
update.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "update", "query"})
+@CapabilityDescription("Update documents in an Elasticsearch index using a
query. The query can be loaded from a flowfile body " +
+ "or from the Query parameter.")
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
Review comment:
Same comments as above
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context)
{
type =
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();
- List<PropertyDescriptor> dynamic =
context.getProperties().entrySet().stream()
- .filter( e -> e.getKey().isDynamic())
- .map(e -> e.getKey())
+ List<PropertyDescriptor> dynamic =
context.getProperties().keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
- Map<String, RecordPath> _temp = new HashMap<>();
+ Map<String, RecordPath> temp = new HashMap<>();
Review comment:
Let's rename this something more descriptive like
`tempRecordPathMappings`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context)
{
type =
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();
- List<PropertyDescriptor> dynamic =
context.getProperties().entrySet().stream()
- .filter( e -> e.getKey().isDynamic())
- .map(e -> e.getKey())
+ List<PropertyDescriptor> dynamic =
context.getProperties().keySet().stream()
Review comment:
Let's take the opportunity to rename this `dynamicDescriptors` or
something similar.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -359,9 +358,21 @@ public DeleteOperationResponse deleteByQuery(final String
query, final String in
return new
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
}
+
+ public UpdateOperationResponse updateByQuery(final String query, final
String index, final String type, final Map<String, String> requestParameters) {
+ long start = System.currentTimeMillis();
+ Response response = runQuery("_update_by_query", query, index, type,
requestParameters);
+ long end = System.currentTimeMillis();
Review comment:
Though it aligns the `System.currentTimeMillis`, I think we should
prefer removing the extra spaces here.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -83,15 +83,14 @@
private String type;
private ObjectMapper mapper;
- private final List<PropertyDescriptor> DESCRIPTORS;
+ private final List<PropertyDescriptor> descriptors;
public ElasticSearchLookupService() {
- List<PropertyDescriptor> _desc = new ArrayList<>();
- _desc.addAll(super.getSupportedPropertyDescriptors());
- _desc.add(CLIENT_SERVICE);
- _desc.add(INDEX);
- _desc.add(TYPE);
- DESCRIPTORS = Collections.unmodifiableList(_desc);
+ List<PropertyDescriptor> desc = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ desc.add(CLIENT_SERVICE);
+ desc.add(INDEX);
+ desc.add(TYPE);
+ descriptors = Collections.unmodifiableList(desc);
}
private volatile ConcurrentHashMap<String, RecordPath> mappings;
Review comment:
Let's move this up with the other variables, and rename it something
more descriptive like `recordPathMappings`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor
implements ElasticsearchRestProcessor {
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("If the \"by query\" operation fails, and a flowfile was
read, it will be sent to this relationship.")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("If the \"by query\" operation succeeds, and a flowfile
was read, it will be sent to this relationship.")
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ private volatile ElasticSearchClientService clientService;
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ abstract String tookAttribute();
+
+ abstract String errorAttribute();
+
+ abstract OperationResponse performOperation(final
ElasticSearchClientService clientService, final String query,
+ final String index, final
String type,
+ final Map<String, String>
requestParameters);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String
propertyDescriptorName) {
Review comment:
Could be `final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -290,22 +288,19 @@ private Record getByQuery(final Map<String, Object>
query, Map<String, String> c
}
private Record applyMappings(Record record, Map<String, Object> source) {
- Record _rec = new MapRecord(record.getSchema(), new HashMap<>());
+ Record rec = new MapRecord(record.getSchema(), new HashMap<>());
- mappings.entrySet().forEach(entry -> {
+ mappings.forEach((key, path) -> {
try {
- Object o = JsonPath.read(source, entry.getKey());
- RecordPath path = entry.getValue();
- Optional<FieldValue> first =
path.evaluate(_rec).getSelectedFields().findFirst();
- if (first.isPresent()) {
- first.get().updateValue(o);
- }
+ Object o = JsonPath.read(source, key);
Review comment:
Could use `final` here, etc.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
package org.apache.nifi.processors.elasticsearch;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
+@WritesAttributes({
+ @WritesAttribute(attribute = "elasticsearch.delete.took", description
= "The amount of time that it took to complete the delete operation in ms."),
+ @WritesAttribute(attribute = "elasticsearch.delete.error", description
= "The error message provided by Elasticsearch if there is an error running the
delete.")
+})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The
query can be loaded from a flowfile body " +
"or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
- @WritesAttribute(attribute = "elasticsearch.delete.took", description =
"The amount of time that it took to complete the delete operation in ms."),
- @WritesAttribute(attribute = "elasticsearch.delete.error", description =
"The error message provided by Elasticsearch if there is an error running the
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements
ElasticsearchRestProcessor {
- public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
- .description("If the delete by query fails, and a flowfile was read,
it will be sent to this relationship.").build();
-
- public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
- .description("If the delete by query succeeds, and a flowfile was
read, it will be sent to this relationship.")
- .build();
-
-
+@DynamicProperty(
+ name = "A URL query parameter",
Review comment:
Rephrasing suggestion:
```suggestion
name = "The name of a URL query parameter to add",
```
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -141,14 +141,21 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters
paginatedJsonQueryPara
if (!newQuery &&
PAGINATION_SCROLL.getValue().equals(paginationType)) {
response = clientService.get().scroll(queryJson);
} else {
+ Map<String, String> requestParameters =
getUrlQueryParameters(context, input);
Review comment:
`final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -359,9 +358,21 @@ public DeleteOperationResponse deleteByQuery(final String
query, final String in
return new
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
}
+
+ public UpdateOperationResponse updateByQuery(final String query, final
String index, final String type, final Map<String, String> requestParameters) {
+ long start = System.currentTimeMillis();
Review comment:
This variables could be `final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor
implements ElasticsearchRestProcessor {
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("If the \"by query\" operation fails, and a flowfile was
read, it will be sent to this relationship.")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("If the \"by query\" operation succeeds, and a flowfile
was read, it will be sent to this relationship.")
+ .build();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+
+ private volatile ElasticSearchClientService clientService;
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(QUERY);
+ descriptors.add(QUERY_ATTRIBUTE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(CLIENT_SERVICE);
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ abstract String tookAttribute();
+
+ abstract String errorAttribute();
+
+ abstract OperationResponse performOperation(final
ElasticSearchClientService clientService, final String query,
+ final String index, final
String type,
+ final Map<String, String>
requestParameters);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String
propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ clientService =
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ FlowFile input = null;
+ if (context.hasIncomingConnection()) {
+ input = session.get();
+
+ if (input == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ try {
+ final String query = getQuery(input, context, session);
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+ final String type = context.getProperty(TYPE).isSet()
+ ?
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
+ : null;
+ final String queryAttr =
context.getProperty(QUERY_ATTRIBUTE).isSet()
+ ?
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+ : null;
+
+ final OperationResponse or = performOperation(this.clientService,
query, index, type, getUrlQueryParameters(context, input));
+
+ if (input == null) {
+ input = session.create();
+ }
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put(tookAttribute(), String.valueOf(or.getTook()));
+ if (!StringUtils.isBlank(queryAttr)) {
+ attrs.put(queryAttr, query);
+ }
+
+ input = session.putAllAttributes(input, attrs);
+
+ session.transfer(input, REL_SUCCESS);
+ } catch (Exception e) {
Review comment:
`final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
package org.apache.nifi.processors.elasticsearch;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
+@WritesAttributes({
+ @WritesAttribute(attribute = "elasticsearch.delete.took", description
= "The amount of time that it took to complete the delete operation in ms."),
+ @WritesAttribute(attribute = "elasticsearch.delete.error", description
= "The error message provided by Elasticsearch if there is an error running the
delete.")
+})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The
query can be loaded from a flowfile body " +
"or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
- @WritesAttribute(attribute = "elasticsearch.delete.took", description =
"The amount of time that it took to complete the delete operation in ms."),
- @WritesAttribute(attribute = "elasticsearch.delete.error", description =
"The error message provided by Elasticsearch if there is an error running the
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements
ElasticsearchRestProcessor {
- public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
- .description("If the delete by query fails, and a flowfile was read,
it will be sent to this relationship.").build();
-
- public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
- .description("If the delete by query succeeds, and a flowfile was
read, it will be sent to this relationship.")
- .build();
-
-
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
Review comment:
Rephrasing suggestion:
```suggestion
value = "The value of the URL query parameter",
```
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
##########
@@ -46,11 +48,15 @@
"Elasticsearch JSON DSL. It does not automatically paginate queries
for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care
should be taken on the size of the query because the entire response " +
"from Elasticsearch will be loaded into memory all at once and
converted into the resulting flowfiles.")
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing")
Review comment:
Same comments as above
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -48,6 +50,11 @@
@CapabilityDescription("A processor that allows the user to run a paginated
query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY
attribute is populated. " +
"Search After/Point in Time queries must include a valid \"sort\"
field.")
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing")
Review comment:
Same comments as above
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.junit.Assert
+import org.junit.Test
+
+abstract class AbstractByQueryElasticsearchTest {
Review comment:
This looks like it could be easily converted to a Java test. I know
opinions are split on Groovy vs Java tests, but I'm going to recommend
converting this one.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -414,8 +434,8 @@ private void removeBadRecordFlowFiles(List<FlowFile> bad,
ProcessSession session
bad.clear();
}
- private FlowFile indexDocuments(BulkOperation bundle, ProcessSession
session, FlowFile input) throws Exception {
- IndexOperationResponse response =
clientService.bulk(bundle.getOperationList());
+ private FlowFile indexDocuments(BulkOperation bundle, ProcessContext
context, ProcessSession session, FlowFile input) throws Exception {
+ IndexOperationResponse response =
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context,
input));
Review comment:
`final`
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -72,6 +74,11 @@
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put",
"index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses
the official Elastic REST client libraries.")
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing")
Review comment:
Same comments as above
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
##########
@@ -65,6 +66,11 @@
"Search After/Point in Time queries must include a valid \"sort\"
field. The processor will retrieve multiple pages of results " +
"until either no more results are available or the Pagination Keep
Alive expiration is reached, after which the query will " +
"restart with the first page of results being retrieved.")
+@DynamicProperty(
+ name = "A URL query parameter",
+ value = "The value to set it to",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Adds the specified property name/value as a query
parameter in the Elasticsearch URL used for processing")
Review comment:
Same comments as above
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -182,8 +181,8 @@ private Response runQuery(final String endpoint, final
String query, final Strin
final HttpEntity queryEntity = new NStringEntity(query,
ContentType.APPLICATION_JSON);
try {
- return client.performRequest("POST", sb.toString(),
requestParameters != null ? requestParameters : Collections.emptyMap(),
queryEntity);
- } catch (final Exception e) {
+ return performRequest("POST", sb.toString(), requestParameters,
queryEntity);
Review comment:
Sorry, looks like I clicked the wrong line. I meant the Exception,
which used to be final, but no longer is with the PR
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.junit.Assert
+import org.junit.Test
+
+abstract class AbstractByQueryElasticsearchTest {
Review comment:
Fair enough, sounds good.
##########
File path:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -426,22 +429,21 @@ public void onTrigger(ProcessContext context,
ProcessSession session) {
session.transfer(input, REL_SUCCESS);
}
- private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession
session) {
- for (FlowFile badFlowFile : bad) {
+ private void removeBadRecordFlowFiles(final List<FlowFile> bad, final
ProcessSession session) {
+ for (final FlowFile badFlowFile : bad) {
session.remove(badFlowFile);
}
bad.clear();
}
- private FlowFile indexDocuments(BulkOperation bundle, ProcessContext
context, ProcessSession session, FlowFile input) throws Exception {
- IndexOperationResponse response =
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context,
input));
+ private FlowFile indexDocuments(final BulkOperation bundle, final
ProcessContext context, final ProcessSession session, final FlowFile input)
throws Exception {
+ final IndexOperationResponse response =
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context,
input));
if (response.hasErrors()) {
if(logErrors || getLogger().isDebugEnabled()) {
- List<Map<String, Object>> errors = response.getItems();
- ObjectMapper mapper = new ObjectMapper();
- mapper.enable(SerializationFeature.INDENT_OUTPUT);
- String output = String.format("An error was encountered while
processing bulk operations. Server response below:%n%n%s",
mapper.writeValueAsString(errors));
+ final List<Map<String, Object>> errors = response.getItems();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
Review comment:
What do you think about putting the `enable()` call in the constructor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]