turcsanyip commented on code in PR #10031:
URL: https://github.com/apache/nifi/pull/10031#discussion_r2284774803


##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,39 @@
+nifi-couchbase-services-nar
+Copyright 2014-2023 The Apache Software Foundation

Review Comment:
   Type / copy-paste error:
   ```suggestion
   Copyright 2014-2025 The Apache Software Foundation
   ```



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3ConnectionService.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplicateTo;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+
+import java.util.List;
+
+@CapabilityDescription("Provides a Couchbase SDK 3 based implementation.")
+@Tags({"nosql", "couchbase", "database", "connection"})
+public class Couchbase3ConnectionService extends AbstractControllerService 
implements CouchbaseConnectionService {
+
+    public static final PropertyDescriptor CONNECTION_STRING = new 
PropertyDescriptor.Builder()
+            .name("Connection String")
+            .description("The hostnames or ip addresses of the bootstraping 
nodes and optional parameters."
+                    + " Syntax) 
couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")

Review Comment:
   Typo:
   ```suggestion
                       + " Syntax: 
couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
   ```



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor DOCUMENT_ID = new 
PropertyDescriptor.Builder()
+            .name("Document Id")
+            .description("Couchbase document id, or an expression to construct 
the Couchbase document id.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Couchbase Connection Service")
+            .description("A Couchbase Connection Service which manages 
connections to a Couchbase cluster.")
+            .required(true)
+            .identifiesControllerService(CouchbaseConnectionService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new 
PropertyDescriptor.Builder()
+            .name("Bucket Name")
+            .description("The name of bucket.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SCOPE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Scope Name")
+            .description("The name of scope.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Collection Name")
+            .description("The name of collection.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Document Type")
+            .description("The type of contents.")
+            .required(true)
+            .allowableValues(DocumentType.values())
+            .defaultValue(DocumentType.Json.toString())
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema.")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("All FlowFile that fail due to server/cluster 
availability go to this relationship.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            DOCUMENT_ID,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME,
+            DOCUMENT_TYPE,
+            COUCHBASE_CONNECTION_SERVICE
+    );
+
+    public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, 
REL_FAILURE, REL_RETRY);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    protected CouchbaseConnectionService connectionService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        connectionService = 
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+    }
+
+    protected byte[] readFlowFileContent(ProcessSession session, FlowFile 
flowFile) {
+        final byte[] content = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
+
+        return content;
+    }
+
+    protected String createTransitUrl(CouchbaseContext couchbaseContext, 
String documentId) {
+        return String.join(".", couchbaseContext.bucket(), 
couchbaseContext.scope(), couchbaseContext.collection(), documentId);

Review Comment:
   The URI (or URI-like identifier) of the external system (eg. database host, 
kafka brokers) is usually added in the Transit Uri, so I think it should be 
prefixed with the Couchbase connection URL (without the parameters).



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor DOCUMENT_ID = new 
PropertyDescriptor.Builder()
+            .name("Document Id")
+            .description("Couchbase document id, or an expression to construct 
the Couchbase document id.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Couchbase Connection Service")
+            .description("A Couchbase Connection Service which manages 
connections to a Couchbase cluster.")
+            .required(true)
+            .identifiesControllerService(CouchbaseConnectionService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new 
PropertyDescriptor.Builder()
+            .name("Bucket Name")
+            .description("The name of bucket.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SCOPE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Scope Name")
+            .description("The name of scope.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Collection Name")
+            .description("The name of collection.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Document Type")
+            .description("The type of contents.")
+            .required(true)
+            .allowableValues(DocumentType.values())
+            .defaultValue(DocumentType.Json.toString())
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema.")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("All FlowFile that fail due to server/cluster 
availability go to this relationship.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            DOCUMENT_ID,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME,
+            DOCUMENT_TYPE,
+            COUCHBASE_CONNECTION_SERVICE

Review Comment:
   I suggest the following property order:
   ```suggestion
               COUCHBASE_CONNECTION_SERVICE,
               BUCKET_NAME,
               SCOPE_NAME,
               COLLECTION_NAME,
               DOCUMENT_TYPE,
               DOCUMENT_ID
   ```
   Connection service properties are usually located at the top. Then following 
bucket-scope-collection hierarchy. And Document descriptors at the end.



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3Client.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import com.couchbase.client.core.error.AuthenticationFailureException;
+import com.couchbase.client.core.error.BucketNotFoundException;
+import com.couchbase.client.core.error.CasMismatchException;
+import com.couchbase.client.core.error.CollectionNotFoundException;
+import com.couchbase.client.core.error.ConfigException;
+import com.couchbase.client.core.error.DatasetNotFoundException;
+import com.couchbase.client.core.error.DecodingFailureException;
+import com.couchbase.client.core.error.DocumentExistsException;
+import com.couchbase.client.core.error.DocumentLockedException;
+import com.couchbase.client.core.error.DocumentMutationLostException;
+import com.couchbase.client.core.error.DocumentNotFoundException;
+import com.couchbase.client.core.error.DocumentNotLockedException;
+import com.couchbase.client.core.error.DocumentUnretrievableException;
+import com.couchbase.client.core.error.DurableWriteInProgressException;
+import com.couchbase.client.core.error.DurableWriteReCommitInProgressException;
+import com.couchbase.client.core.error.FeatureNotAvailableException;
+import com.couchbase.client.core.error.InvalidArgumentException;
+import com.couchbase.client.core.error.RequestCanceledException;
+import com.couchbase.client.core.error.ScopeNotFoundException;
+import com.couchbase.client.core.error.ServerOutOfMemoryException;
+import com.couchbase.client.core.error.ServiceNotAvailableException;
+import com.couchbase.client.core.error.TemporaryFailureException;
+import com.couchbase.client.core.error.ValueTooLargeException;
+import com.couchbase.client.core.error.subdoc.PathExistsException;
+import com.couchbase.client.core.error.subdoc.PathNotFoundException;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.codec.RawBinaryTranscoder;
+import com.couchbase.client.java.codec.RawJsonTranscoder;
+import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.kv.GetOptions;
+import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.MutationResult;
+import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplicateTo;
+import com.couchbase.client.java.kv.UpsertOptions;
+import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+import org.apache.nifi.services.couchbase.utils.JsonValidator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static java.util.Map.entry;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.FAILURE;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.RETRY;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.ROLLBACK;
+
+public class Couchbase3Client implements CouchbaseClient {
+
+    private final Collection collection;
+    private final DocumentType documentType;
+    private final PersistTo persistTo;
+    private final ReplicateTo replicateTo;
+
+    public Couchbase3Client(Collection collection, DocumentType documentType, 
PersistTo persistTo, ReplicateTo replicateTo) {
+        this.collection = collection;
+        this.documentType = documentType;
+        this.persistTo = persistTo;
+        this.replicateTo = replicateTo;
+    }
+
+    @Override
+    public CouchbaseGetResult getDocument(String documentId) throws 
CouchbaseException {
+        try {
+            final GetResult result = collection.get(documentId, 
GetOptions.getOptions().transcoder(getTranscoder(documentType)));
+
+            return new CouchbaseGetResult(result.contentAsBytes(), 
result.cas());
+        } catch (Exception e) {
+            throw new CouchbaseException(e);
+        }
+    }
+
+    @Override
+    public long upsertDocument(String documentId, byte[] content) throws 
CouchbaseException {

Review Comment:
   Though it is a single value (at the moment), I would consider 
`CouchbaseUpsertResult`.



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-3-services/src/main/java/org/apache/nifi/services/couchbase/Couchbase3Client.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import com.couchbase.client.core.error.AuthenticationFailureException;
+import com.couchbase.client.core.error.BucketNotFoundException;
+import com.couchbase.client.core.error.CasMismatchException;
+import com.couchbase.client.core.error.CollectionNotFoundException;
+import com.couchbase.client.core.error.ConfigException;
+import com.couchbase.client.core.error.DatasetNotFoundException;
+import com.couchbase.client.core.error.DecodingFailureException;
+import com.couchbase.client.core.error.DocumentExistsException;
+import com.couchbase.client.core.error.DocumentLockedException;
+import com.couchbase.client.core.error.DocumentMutationLostException;
+import com.couchbase.client.core.error.DocumentNotFoundException;
+import com.couchbase.client.core.error.DocumentNotLockedException;
+import com.couchbase.client.core.error.DocumentUnretrievableException;
+import com.couchbase.client.core.error.DurableWriteInProgressException;
+import com.couchbase.client.core.error.DurableWriteReCommitInProgressException;
+import com.couchbase.client.core.error.FeatureNotAvailableException;
+import com.couchbase.client.core.error.InvalidArgumentException;
+import com.couchbase.client.core.error.RequestCanceledException;
+import com.couchbase.client.core.error.ScopeNotFoundException;
+import com.couchbase.client.core.error.ServerOutOfMemoryException;
+import com.couchbase.client.core.error.ServiceNotAvailableException;
+import com.couchbase.client.core.error.TemporaryFailureException;
+import com.couchbase.client.core.error.ValueTooLargeException;
+import com.couchbase.client.core.error.subdoc.PathExistsException;
+import com.couchbase.client.core.error.subdoc.PathNotFoundException;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.codec.RawBinaryTranscoder;
+import com.couchbase.client.java.codec.RawJsonTranscoder;
+import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.kv.GetOptions;
+import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.MutationResult;
+import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplicateTo;
+import com.couchbase.client.java.kv.UpsertOptions;
+import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+import org.apache.nifi.services.couchbase.utils.JsonValidator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static java.util.Map.entry;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.FAILURE;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.RETRY;
+import static 
org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler.ErrorHandlingStrategy.ROLLBACK;
+
+public class Couchbase3Client implements CouchbaseClient {
+
+    private final Collection collection;
+    private final DocumentType documentType;
+    private final PersistTo persistTo;
+    private final ReplicateTo replicateTo;
+
+    public Couchbase3Client(Collection collection, DocumentType documentType, 
PersistTo persistTo, ReplicateTo replicateTo) {

Review Comment:
   As the client implementation is not intended to be used directly by the 
processors, the class and its constructor should be packate-private.
   ```suggestion
   class Couchbase3Client implements CouchbaseClient {
   
       private final Collection collection;
       private final DocumentType documentType;
       private final PersistTo persistTo;
       private final ReplicateTo replicateTo;
   
      Couchbase3Client(Collection collection, DocumentType documentType, 
PersistTo persistTo, ReplicateTo replicateTo) {
   ```



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbase.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Map.entry;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.BUCKET_ATTRIBUTE;
+import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.CAS_ATTRIBUTE;
+import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.COLLECTION_ATTRIBUTE;
+import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.DOCUMENT_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.couchbase.utils.CouchbaseAttributes.SCOPE_ATTRIBUTE;
+
+@Tags({"nosql", "couchbase", "database", "get"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Get a document from Couchbase Server. The ID of the 
document to fetch may be supplied by setting " +
+        "the <Document Id> property or reading it from the FlowFile content.")
+@WritesAttributes({
+        @WritesAttribute(attribute = BUCKET_ATTRIBUTE, description = "Bucket 
where the document was stored."),
+        @WritesAttribute(attribute = SCOPE_ATTRIBUTE, description = "Scope 
where the document was stored."),
+        @WritesAttribute(attribute = COLLECTION_ATTRIBUTE, description = 
"Collection where the document was stored."),
+        @WritesAttribute(attribute = DOCUMENT_ID_ATTRIBUTE, description = "Id 
of the document."),
+        @WritesAttribute(attribute = CAS_ATTRIBUTE, description = "CAS of the 
document.")
+})
+public class GetCouchbase extends AbstractCouchbaseProcessor {
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final String documentId = context.getProperty(DOCUMENT_ID).isSet()
+                ? 
context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue()
+                : new String(readFlowFileContent(session, flowFile), 
StandardCharsets.UTF_8);
+
+        if (isEmpty(documentId)) {
+            throw new ProcessException("Document ID is missing. Please provide 
a valid Document ID through processor property or FlowFile content.");
+        }
+
+        final String bucketName = 
context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String scopeName = 
context.getProperty(SCOPE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String collectionName = 
context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final DocumentType documentType = 
DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+
+        final CouchbaseContext couchbaseContext = new 
CouchbaseContext(bucketName, scopeName, collectionName, documentType);
+        final CouchbaseClient couchbaseClient = 
connectionService.getClient(couchbaseContext);

Review Comment:
   This section is the same as in the Put processor and may be extracted to the 
abstract class.



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/utils/CouchbaseAttributes.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase.utils;
+
+public class CouchbaseAttributes {
+
+    public static final String BUCKET_ATTRIBUTE = "couchbase.bucket";

Review Comment:
   I suggest extracting the descriptions into constants too (as they are the 
same for Put and Get).
   Similar to 
[GoogleDriveAttibutes](https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java).



##########
nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+import org.apache.nifi.services.couchbase.exception.CouchbaseErrorHandler;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor DOCUMENT_ID = new 
PropertyDescriptor.Builder()
+            .name("Document Id")
+            .description("Couchbase document id, or an expression to construct 
the Couchbase document id.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Couchbase Connection Service")
+            .description("A Couchbase Connection Service which manages 
connections to a Couchbase cluster.")
+            .required(true)
+            .identifiesControllerService(CouchbaseConnectionService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new 
PropertyDescriptor.Builder()
+            .name("Bucket Name")
+            .description("The name of bucket.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor SCOPE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Scope Name")
+            .description("The name of scope.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Collection Name")
+            .description("The name of collection.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue("_default")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Document Type")
+            .description("The type of contents.")
+            .required(true)
+            .allowableValues(DocumentType.values())
+            .defaultValue(DocumentType.Json.toString())
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema.")
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("All FlowFile that fail due to server/cluster 
availability go to this relationship.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            DOCUMENT_ID,
+            BUCKET_NAME,
+            SCOPE_NAME,
+            COLLECTION_NAME,
+            DOCUMENT_TYPE,
+            COUCHBASE_CONNECTION_SERVICE
+    );
+
+    public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, 
REL_FAILURE, REL_RETRY);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    protected CouchbaseConnectionService connectionService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        connectionService = 
context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+    }
+
+    protected byte[] readFlowFileContent(ProcessSession session, FlowFile 
flowFile) {
+        final byte[] content = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
+
+        return content;
+    }
+
+    protected String createTransitUrl(CouchbaseContext couchbaseContext, 
String documentId) {

Review Comment:
   ```suggestion
       protected String createTransitUri(CouchbaseContext couchbaseContext, 
String documentId) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to