turcsanyip commented on code in PR #10031: URL: https://github.com/apache/nifi/pull/10031#discussion_r2284774803
########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services-nar/src/main/resources/META-INF/NOTICE: ########## @@ -0,0 +1,39 @@ +nifi-couchbase-services-nar +Copyright 2014-2023 The Apache Software Foundation Review Comment: Type / copy-paste error: ```suggestion Copyright 2014-2025 The Apache Software Foundation ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3ConnectionService.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; + +import java.util.List; + +@CapabilityDescription("Provides a Couchbase SDK 3 based implementation.") +@Tags({"nosql", "couchbase", "database", "connection"}) +public class Couchbase3ConnectionService extends AbstractControllerService implements CouchbaseConnectionService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") Review Comment: Typo: ```suggestion + " Syntax: couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.CouchbaseClient; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; +import org.apache.nifi.services.couchbase.utils.DocumentType; +import org.apache.nifi.services.couchbase.CouchbaseConnectionService; +import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.stream.io.StreamUtils; + +import java.util.List; +import java.util.Set; + +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder() + .name("Document Id") + .description("Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Couchbase Connection Service") + .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseConnectionService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("Bucket Name") + .description("The name of bucket.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder() + .name("Scope Name") + .description("The name of scope.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Collection Name") + .description("The name of collection.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() + .name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the data ingestion was successful.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("All FlowFile that fail due to server/cluster availability go to this relationship.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + DOCUMENT_ID, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME, + DOCUMENT_TYPE, + COUCHBASE_CONNECTION_SERVICE + ); + + public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + protected CouchbaseConnectionService connectionService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class); + } + + protected byte[] readFlowFileContent(ProcessSession session, FlowFile flowFile) { + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); + + return content; + } + + protected String createTransitUrl(CouchbaseContext couchbaseContext, String documentId) { + return String.join(".", couchbaseContext.bucket(), couchbaseContext.scope(), couchbaseContext.collection(), documentId); Review Comment: The URI (or URI-like identifier) of the external system (eg. database host, kafka brokers) is usually added in the Transit Uri, so I think it should be prefixed with the Couchbase connection URL (without the parameters). ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.CouchbaseClient; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; +import org.apache.nifi.services.couchbase.utils.DocumentType; +import org.apache.nifi.services.couchbase.CouchbaseConnectionService; +import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.stream.io.StreamUtils; + +import java.util.List; +import java.util.Set; + +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder() + .name("Document Id") + .description("Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Couchbase Connection Service") + .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseConnectionService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("Bucket Name") + .description("The name of bucket.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder() + .name("Scope Name") + .description("The name of scope.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Collection Name") + .description("The name of collection.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() + .name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the data ingestion was successful.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("All FlowFile that fail due to server/cluster availability go to this relationship.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + DOCUMENT_ID, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME, + DOCUMENT_TYPE, + COUCHBASE_CONNECTION_SERVICE Review Comment: I suggest the following property order: ```suggestion COUCHBASE_CONNECTION_SERVICE, BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, DOCUMENT_TYPE, DOCUMENT_ID ``` Connection service properties are usually located at the top. Then following bucket-scope-collection hierarchy. And Document descriptors at the end. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3Client.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import com.couchbase.client.core.error.AuthenticationFailureException; +import com.couchbase.client.core.error.BucketNotFoundException; +import com.couchbase.client.core.error.CasMismatchException; +import com.couchbase.client.core.error.CollectionNotFoundException; +import com.couchbase.client.core.error.ConfigException; +import com.couchbase.client.core.error.DatasetNotFoundException; +import com.couchbase.client.core.error.DecodingFailureException; +import com.couchbase.client.core.error.DocumentExistsException; +import com.couchbase.client.core.error.DocumentLockedException; +import com.couchbase.client.core.error.DocumentMutationLostException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.core.error.DocumentNotLockedException; +import com.couchbase.client.core.error.DocumentUnretrievableException; +import com.couchbase.client.core.error.DurableWriteInProgressException; +import com.couchbase.client.core.error.DurableWriteReCommitInProgressException; +import com.couchbase.client.core.error.FeatureNotAvailableException; +import com.couchbase.client.core.error.InvalidArgumentException; +import com.couchbase.client.core.error.RequestCanceledException; +import com.couchbase.client.core.error.ScopeNotFoundException; +import com.couchbase.client.core.error.ServerOutOfMemoryException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.TemporaryFailureException; +import com.couchbase.client.core.error.ValueTooLargeException; +import com.couchbase.client.core.error.subdoc.PathExistsException; +import com.couchbase.client.core.error.subdoc.PathNotFoundException; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.codec.RawJsonTranscoder; +import com.couchbase.client.java.codec.Transcoder; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.DocumentType; +import org.apache.nifi.services.couchbase.utils.JsonValidator; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +import static java.util.Map.entry; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.FAILURE; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.RETRY; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.ROLLBACK; + +public class Couchbase3Client implements CouchbaseClient { + + private final Collection collection; + private final DocumentType documentType; + private final PersistTo persistTo; + private final ReplicateTo replicateTo; + + public Couchbase3Client(Collection collection, DocumentType documentType, PersistTo persistTo, ReplicateTo replicateTo) { + this.collection = collection; + this.documentType = documentType; + this.persistTo = persistTo; + this.replicateTo = replicateTo; + } + + @Override + public CouchbaseGetResult getDocument(String documentId) throws CouchbaseException { + try { + final GetResult result = collection.get(documentId, GetOptions.getOptions().transcoder(getTranscoder(documentType))); + + return new CouchbaseGetResult(result.contentAsBytes(), result.cas()); + } catch (Exception e) { + throw new CouchbaseException(e); + } + } + + @Override + public long upsertDocument(String documentId, byte[] content) throws CouchbaseException { Review Comment: Though it is a single value (at the moment), I would consider `CouchbaseUpsertResult`. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3Client.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import com.couchbase.client.core.error.AuthenticationFailureException; +import com.couchbase.client.core.error.BucketNotFoundException; +import com.couchbase.client.core.error.CasMismatchException; +import com.couchbase.client.core.error.CollectionNotFoundException; +import com.couchbase.client.core.error.ConfigException; +import com.couchbase.client.core.error.DatasetNotFoundException; +import com.couchbase.client.core.error.DecodingFailureException; +import com.couchbase.client.core.error.DocumentExistsException; +import com.couchbase.client.core.error.DocumentLockedException; +import com.couchbase.client.core.error.DocumentMutationLostException; +import com.couchbase.client.core.error.DocumentNotFoundException; +import com.couchbase.client.core.error.DocumentNotLockedException; +import com.couchbase.client.core.error.DocumentUnretrievableException; +import com.couchbase.client.core.error.DurableWriteInProgressException; +import com.couchbase.client.core.error.DurableWriteReCommitInProgressException; +import com.couchbase.client.core.error.FeatureNotAvailableException; +import com.couchbase.client.core.error.InvalidArgumentException; +import com.couchbase.client.core.error.RequestCanceledException; +import com.couchbase.client.core.error.ScopeNotFoundException; +import com.couchbase.client.core.error.ServerOutOfMemoryException; +import com.couchbase.client.core.error.ServiceNotAvailableException; +import com.couchbase.client.core.error.TemporaryFailureException; +import com.couchbase.client.core.error.ValueTooLargeException; +import com.couchbase.client.core.error.subdoc.PathExistsException; +import com.couchbase.client.core.error.subdoc.PathNotFoundException; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.codec.RawBinaryTranscoder; +import com.couchbase.client.java.codec.RawJsonTranscoder; +import com.couchbase.client.java.codec.Transcoder; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.DocumentType; +import org.apache.nifi.services.couchbase.utils.JsonValidator; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +import static java.util.Map.entry; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.FAILURE; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.RETRY; +import static org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.ROLLBACK; + +public class Couchbase3Client implements CouchbaseClient { + + private final Collection collection; + private final DocumentType documentType; + private final PersistTo persistTo; + private final ReplicateTo replicateTo; + + public Couchbase3Client(Collection collection, DocumentType documentType, PersistTo persistTo, ReplicateTo replicateTo) { Review Comment: As the client implementation is not intended to be used directly by the processors, the class and its constructor should be packate-private. ```suggestion class Couchbase3Client implements CouchbaseClient { private final Collection collection; private final DocumentType documentType; private final PersistTo persistTo; private final ReplicateTo replicateTo; Couchbase3Client(Collection collection, DocumentType documentType, PersistTo persistTo, ReplicateTo replicateTo) { ``` ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbase.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.services.couchbase.CouchbaseClient; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.DocumentType; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.Map.entry; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.BUCKET_ATTRIBUTE; +import static org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.CAS_ATTRIBUTE; +import static org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.COLLECTION_ATTRIBUTE; +import static org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.DOCUMENT_ID_ATTRIBUTE; +import static org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.SCOPE_ATTRIBUTE; + +@Tags({"nosql", "couchbase", "database", "get"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Get a document from Couchbase Server. The ID of the document to fetch may be supplied by setting " + + "the <Document Id> property or reading it from the FlowFile content.") +@WritesAttributes({ + @WritesAttribute(attribute = BUCKET_ATTRIBUTE, description = "Bucket where the document was stored."), + @WritesAttribute(attribute = SCOPE_ATTRIBUTE, description = "Scope where the document was stored."), + @WritesAttribute(attribute = COLLECTION_ATTRIBUTE, description = "Collection where the document was stored."), + @WritesAttribute(attribute = DOCUMENT_ID_ATTRIBUTE, description = "Id of the document."), + @WritesAttribute(attribute = CAS_ATTRIBUTE, description = "CAS of the document.") +}) +public class GetCouchbase extends AbstractCouchbaseProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + final String documentId = context.getProperty(DOCUMENT_ID).isSet() + ? context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue() + : new String(readFlowFileContent(session, flowFile), StandardCharsets.UTF_8); + + if (isEmpty(documentId)) { + throw new ProcessException("Document ID is missing. Please provide a valid Document ID through processor property or FlowFile content."); + } + + final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + + final CouchbaseContext couchbaseContext = new CouchbaseContext(bucketName, scopeName, collectionName, documentType); + final CouchbaseClient couchbaseClient = connectionService.getClient(couchbaseContext); Review Comment: This section is the same as in the Put processor and may be extracted to the abstract class. ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/utils/CouchbaseAttributes.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase.utils; + +public class CouchbaseAttributes { + + public static final String BUCKET_ATTRIBUTE = "couchbase.bucket"; Review Comment: I suggest extracting the descriptions into constants too (as they are the same for Put and Get). Similar to [GoogleDriveAttibutes](https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java). ########## nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.CouchbaseClient; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; +import org.apache.nifi.services.couchbase.utils.DocumentType; +import org.apache.nifi.services.couchbase.CouchbaseConnectionService; +import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.stream.io.StreamUtils; + +import java.util.List; +import java.util.Set; + +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder() + .name("Document Id") + .description("Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Couchbase Connection Service") + .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseConnectionService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("Bucket Name") + .description("The name of bucket.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder() + .name("Scope Name") + .description("The name of scope.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Collection Name") + .description("The name of collection.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("_default") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() + .name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the data ingestion was successful.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("All FlowFile that fail due to server/cluster availability go to this relationship.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + DOCUMENT_ID, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME, + DOCUMENT_TYPE, + COUCHBASE_CONNECTION_SERVICE + ); + + public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + protected CouchbaseConnectionService connectionService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class); + } + + protected byte[] readFlowFileContent(ProcessSession session, FlowFile flowFile) { + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); + + return content; + } + + protected String createTransitUrl(CouchbaseContext couchbaseContext, String documentId) { Review Comment: ```suggestion protected String createTransitUri(CouchbaseContext couchbaseContext, String documentId) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
