This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new fd0d3a69f ATLAS-4789: added couchbase bridge
fd0d3a69f is described below
commit fd0d3a69fe97867b3565d0a2f497da6362ce5ef4
Author: Dmitrii Chechetkin <[email protected]>
AuthorDate: Mon Oct 30 12:31:32 2023 -0400
ATLAS-4789: added couchbase bridge
Signed-off-by: Madhan Neethiraj <[email protected]>
---
addons/couchbase-bridge/pom.xml | 201 ++++++++++++
.../com/couchbase/atlas/connector/AtlasConfig.java | 54 ++++
.../com/couchbase/atlas/connector/CBConfig.java | 145 +++++++++
.../couchbase/atlas/connector/CouchbaseHook.java | 341 +++++++++++++++++++++
.../connector/entities/CouchbaseAtlasEntity.java | 227 ++++++++++++++
.../atlas/connector/entities/CouchbaseBucket.java | 89 ++++++
.../atlas/connector/entities/CouchbaseCluster.java | 130 ++++++++
.../connector/entities/CouchbaseCollection.java | 90 ++++++
.../atlas/connector/entities/CouchbaseField.java | 129 ++++++++
.../connector/entities/CouchbaseFieldType.java | 71 +++++
.../atlas/connector/entities/CouchbaseScope.java | 93 ++++++
.../main/resources/atlas-application.properties | 17 +
.../atlas/connector/CouchbaseHookTest.java | 136 ++++++++
.../entities/CouchbaseAtlasEntityTest.java | 108 +++++++
.../couchbase-bridge/src/test/resources/log4j.xml | 35 +++
.../5000-Couchbase/5020-couchbase_model.json | 159 ++++++++++
distro/pom.xml | 1 +
.../assemblies/atlas-couchbase-hook-package.xml | 34 ++
distro/src/main/assemblies/standalone-package.xml | 6 +
docs/src/documents/Hook/HookCouchbase.md | 45 +++
pom.xml | 7 +
21 files changed, 2118 insertions(+)
diff --git a/addons/couchbase-bridge/pom.xml b/addons/couchbase-bridge/pom.xml
new file mode 100644
index 000000000..8a3ccef17
--- /dev/null
+++ b/addons/couchbase-bridge/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-atlas</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+ <artifactId>couchbase-bridge</artifactId>
+ <description>Apache Atlas Couchbase Bridge Module</description>
+ <name>Apache Atlas Couchbase Bridge</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>30.1.1-jre</version>
+ </dependency>
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>java-client</artifactId>
+ <version>3.4.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>dcp-client</artifactId>
+ <version>0.44.0</version>
+ </dependency>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-notification</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>dist</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-hook</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/dependency/hook/couchbase/atlas-couchbase-plugin-impl</outputDirectory>
+
<overWriteReleases>false</overWriteReleases>
+
<overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <artifactItems>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>${project.artifactId}</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>atlas-client-common</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>atlas-client-v2</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>atlas-notification</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>atlas-common</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+
<artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.sun.jersey</groupId>
+
<artifactId>jersey-json</artifactId>
+
<version>${jersey.version}</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-module-twiki</artifactId>
+ <version>${doxia.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-core</artifactId>
+ <version>${doxia.version}</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <goals>
+ <goal>site</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <generateProjectInfo>false</generateProjectInfo>
+ <generateReports>false</generateReports>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <inherited>false</inherited>
+ <executions>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
new file mode 100644
index 000000000..3168d7bfa
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import org.apache.atlas.AtlasClientV2;
+
+import java.util.Map;
+
+public class AtlasConfig {
+ private static final Map<String, String> ENV = System.getenv();
+ private static AtlasClientV2 client = null;
+
+ public static String[] urls() {
+ return new String[] { ENV.getOrDefault("ATLAS_URL",
"http://localhost:21000") };
+ }
+
+ public static String username() {
+ return ENV.getOrDefault("ATLAS_USERNAME", "admin");
+ }
+
+ public static String password() {
+ return ENV.getOrDefault("ATLAS_PASSWORD", "admin");
+ }
+
+ public static String[] auth() {
+ return new String[] {username(), password()};
+ }
+
+ public static AtlasClientV2 client() {
+ if (client == null) {
+ client = new AtlasClientV2(urls(), auth());
+ }
+
+ return client;
+ }
+
+ public static void client(AtlasClientV2 client) {
+ AtlasConfig.client = client;
+ }
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
new file mode 100644
index 000000000..70e9dc37c
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.SecurityConfig;
+import com.couchbase.client.java.Cluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CBConfig {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CBConfig.class);
+
+ private static final Map<String, String> ENV =
System.getenv();
+ private static final Integer DCP_FIELD_MIN_OCCUR =
Integer.valueOf(ENV.getOrDefault("DCP_FIELD_MIN_OCCUR", "0"));
+ private static final Float DCP_SAMPLE_RATIO =
Float.valueOf(ENV.getOrDefault("DCP_SAMPLE_RATIO", "1"));
+ private static final Short DCP_FIELD_THRESHOLD =
Short.valueOf(ENV.getOrDefault("DCP_FIELD_THRESHOLD", "0"));
+ private static Client mockDcpClient;
+ private static Cluster cluster;
+
+
+
+ public static String address() {
+ return ENV.getOrDefault("CB_CLUSTER", "couchbase://localhost");
+ }
+
+ public static String username() {
+ return ENV.getOrDefault("CB_USERNAME", "Administrator");
+ }
+
+ public static String password() {
+ return ENV.getOrDefault("CB_PASSWORD", "password");
+ }
+
+ public static String bucket() {
+ return ENV.getOrDefault("CB_BUCKET", "default");
+ }
+
+ public static Collection<String> collections() {
+ String collections = ENV.get("CB_COLLECTIONS");
+
+ if (collections == null) {
+ return null;
+ }
+
+ return Arrays.stream(collections.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+
+ public static String dcpPort() {
+ return ENV.getOrDefault("DCP_PORT", "11210");
+ }
+
+ /**
+ * @return Percentage of DCP messages to be analyzed in form of a short
between 0 and 1.
+ */
+ public static Float dcpSampleRatio() {
+ return DCP_SAMPLE_RATIO;
+ }
+
+ /**
+ * @return a threshold that indicates in what percentage of analyzed
messages per collection a field must appear before it is sent to Atlas
+ */
+ public static Short dcpFieldThreshold() {
+ return DCP_FIELD_THRESHOLD;
+ }
+
+ public static Integer dcpFieldMinOccurences() {
+ return DCP_FIELD_MIN_OCCUR;
+ }
+
+ public static Cluster cluster() {
+ if (cluster == null) {
+ cluster = Cluster.connect(address(), username(), password());
+ }
+
+ return cluster;
+ }
+
+ public static Client dcpClient() {
+ if (mockDcpClient != null) {
+ LOGGER.debug("Using mock DCP client");
+
+ return mockDcpClient;
+ }
+
+ Client.Builder builder = Client.builder()
+ .collectionsAware(true)
+ .seedNodes(String.format("%s:%s",address(),dcpPort()))
+ .connectionString(address())
+ .credentials(username(), password());
+
+ String bucket = bucket();
+
+ if (!(bucket == null || bucket.equals("*") || bucket.isEmpty())) {
+ LOGGER.debug("Monitoring bucket `{}`", bucket);
+
+ builder.bucket(bucket);
+
+ Collection<String> collections = collections();
+
+ if (collections != null && !collections.isEmpty()) {
+ LOGGER.debug("Monitoring collections: {}", String.join(", ",
collections));
+
+ builder.collectionNames(collections);
+ }
+ }
+
+ if (enableTLS()) {
+ LOGGER.debug("Using native TLS");
+
+
builder.securityConfig(SecurityConfig.builder().enableNativeTls(true).build());
+ }
+
+ return builder.build();
+ }
+
+ protected static void dcpClient(Client mockDcpClient) {
+ CBConfig.mockDcpClient = mockDcpClient;
+ }
+
+ private static boolean enableTLS() {
+ return Boolean.parseBoolean(ENV.getOrDefault("CB_ENABLE_TLS",
"false"));
+ }
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
new file mode 100644
index 000000000..0a73307c8
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.atlas.connector.entities.CouchbaseBucket;
+import com.couchbase.atlas.connector.entities.CouchbaseCluster;
+import com.couchbase.atlas.connector.entities.CouchbaseCollection;
+import com.couchbase.atlas.connector.entities.CouchbaseField;
+import com.couchbase.atlas.connector.entities.CouchbaseFieldType;
+import com.couchbase.atlas.connector.entities.CouchbaseScope;
+import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.ControlEventHandler;
+import com.couchbase.client.dcp.DataEventHandler;
+import com.couchbase.client.dcp.StreamFrom;
+import com.couchbase.client.dcp.StreamTo;
+import com.couchbase.client.dcp.highlevel.internal.CollectionIdAndKey;
+import
com.couchbase.client.dcp.highlevel.internal.CollectionsManifest.CollectionInfo;
+import com.couchbase.client.dcp.message.DcpMutationMessage;
+import com.couchbase.client.dcp.message.MessageUtil;
+import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
+import com.couchbase.client.java.json.JsonObject;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CouchbaseHook extends AtlasHook implements ControlEventHandler,
DataEventHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(CouchbaseHook.class);
+
+ protected static CouchbaseHook INSTANCE;
+ protected static Client DCP;
+ protected static AtlasClientV2 ATLAS;
+ private static Consumer<List<AtlasEntity>> createInterceptor;
+ private static Consumer<List<AtlasEntity>> updateInterceptor;
+ private static boolean loop = true;
+
+
+ private CouchbaseCluster clusterEntity;
+ private CouchbaseBucket bucketEntity;
+
+ /**
+ * START HERE
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ // create instances of DCP client,
+ DCP = CBConfig.dcpClient();
+
+ // Atlas client,
+ ATLAS = AtlasConfig.client();
+
+ // and DCP handler
+ INSTANCE = new CouchbaseHook();
+
+ // register DCP handler with DCP client
+ DCP.controlEventHandler(INSTANCE);
+ DCP.dataEventHandler(INSTANCE);
+
+ // Connect to the cluster
+ DCP.connect().block();
+
+ LOG.info("DCP client connected.");
+
+ // Ensure the existence of corresponding
+ // CouchbaseCluster, CouchbaseBucket, CouchbaseScope
+ // entities and store them in local cache
+ INSTANCE.initializeAtlasContext();
+
+ // Start listening to DCP
+ DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
+
+ System.out.println("Starting the stream...");
+ DCP.startStreaming().block();
+
+ System.out.println("Started the stream.");
+ // And then just loop the loop
+ try {
+ while (loop) {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+
+ } finally {
+ DCP.disconnect().block();
+ }
+ }
+
+ /**
+ * Ensures the existence of CouchbaseCluster,
+ * CouchbaseBucket and Couchbase scope entities
+ * and stores them into local cache
+ */
+ private void initializeAtlasContext() {
+ LOG.debug("Creating cluster/bucket/scope entities");
+
+ clusterEntity = new CouchbaseCluster()
+ .name(CBConfig.address())
+ .url(CBConfig.address())
+ .get();
+
+ bucketEntity = new CouchbaseBucket()
+ .name(CBConfig.bucket())
+ .cluster(clusterEntity)
+ .get();
+
+ List<AtlasEntity> entitiesToCreate = new ArrayList<>();
+
+ if (!clusterEntity.exists(ATLAS)) {
+ entitiesToCreate.add(clusterEntity.atlasEntity(ATLAS));
+ }
+
+ if (!bucketEntity.exists(ATLAS)) {
+ entitiesToCreate.add(bucketEntity.atlasEntity(ATLAS));
+ }
+
+ if (!entitiesToCreate.isEmpty()) {
+ createEntities(entitiesToCreate);
+ }
+ }
+
+ private void createEntities(List<AtlasEntity> entities) {
+ if (createInterceptor != null) {
+ createInterceptor.accept(entities);
+ return;
+ }
+
+ AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
+ EntityCreateRequestV2 request = new
EntityCreateRequestV2("couchbase", entity);
+
+ notifyEntities(Arrays.asList(request), null);
+ }
+
+ private void updateEntities(List<AtlasEntity> entities) {
+ if (updateInterceptor != null) {
+ updateInterceptor.accept(entities);
+
+ return;
+ }
+
+ AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
+ EntityUpdateRequestV2 request = new
EntityUpdateRequestV2("couchbase", entity);
+
+ notifyEntities(Arrays.asList(request), null);
+ }
+
+ @Override
+ public void onEvent(ChannelFlowController flowController, ByteBuf event) {
+ // Probabilistic sampling
+ if (Math.random() > CBConfig.dcpSampleRatio()) {
+ LOG.debug("Skipping DCP message.");
+ return;
+ }
+
+ if (DcpMutationMessage.is(event)) {
+ try {
+ // Borrowed from Couchbeans :)
+ // Gathering some information about the message.
+ CollectionIdAndKey ckey =
MessageUtil.getCollectionIdAndKey(event, true);
+ CollectionInfo collectionInfo =
collectionInfo(MessageUtil.getVbucket(event), ckey.collectionId());
+ String collectionName = collectionInfo.name();
+ String scopeName =
collectionInfo.scope().name();
+
+ LOG.debug("Received DCP mutation message for scope '{}' and
collection '{}'", scopeName, collectionName);
+
+ CouchbaseScope scopeEntity = bucketEntity.scope(scopeName);
+
+ // Because Atlas doesn't support upserts,
+ // we need to send new entities in a separate message
+ // from already existing ones
+ List<AtlasEntity> toCreate = new ArrayList<>();
+ List<AtlasEntity> toUpdate = new ArrayList<>();
+
+ if (!scopeEntity.exists(ATLAS)) {
+ toCreate.add(scopeEntity.atlasEntity(ATLAS));
+
+ LOG.debug("Creating scope: {}",
scopeEntity.qualifiedName());
+ } else {
+ toUpdate.add(scopeEntity.atlasEntity(ATLAS));
+
+ LOG.debug("Updating scope: {}",
scopeEntity.qualifiedName());
+ }
+
+ CouchbaseCollection collectionEntity =
scopeEntity.collection(collectionName);
+
+ // Let's record this attempt to analyze a collection document
+ // so that we can calculate field frequencies
+ // when filtering them via DCP_FIELD_THRESHOLD
+ collectionEntity.incrementAnalyzedDocuments();
+
+ // and then schedule it to be sent to Atlas
+ if (!collectionEntity.exists(ATLAS)) {
+ toCreate.add(collectionEntity.atlasEntity(ATLAS));
+ } else {
+ toUpdate.add(collectionEntity.atlasEntity(ATLAS));
+ }
+
+ Map<String, Object> document =
JsonObject.fromJson(DcpMutationMessage.contentBytes(event)).toMap();
+
+ System.out.println(String.format("Document keys: %s",
document.keySet()));
+
+ // for each field in the document...
+ document.entrySet().stream()
+ // transform the field into CouchbaseField either by
loading corresponding entity or by creating it
+ .filter(e -> e.getValue() != null)
+ .flatMap(entry -> processField(collectionEntity,
(Collection<String>) Collections.EMPTY_LIST, null, entry.getKey(),
entry.getValue()))
+ // update document counter on the field entity
+ .peek(CouchbaseField::incrementDocumentCount)
+ // Only passes fields that either already in Atlas or
pass DCP_FIELD_THRESHOLD setting
+ .filter(field -> field.exists(ATLAS) ||
field.documentCount() / (float) collectionEntity.documentsAnalyzed() >
CBConfig.dcpFieldThreshold())
+ // Schedule the entity either for creation or to be
updated in Atlas
+ .forEach(field -> {
+ if (field.exists(ATLAS)) {
+ toUpdate.add(field.atlasEntity(ATLAS));
+ } else {
+ toCreate.add(field.atlasEntity(ATLAS));
+ }
+ });
+
+ createEntities(toCreate);
+ updateEntities(toUpdate);
+
+ System.out.println("Notified Atlas");
+ } catch (Exception e) {
+ LOG.error("Failed to process DCP message", e);
+ }
+ }
+ }
+
+ /**
+ * Constructs a {@link CouchbaseField} from field information
+ *
+ * @param collectionEntity the {@link CouchbaseCollection} to which the
field belongs
+ * @param path the path to the field inside the collection
document excluding the field itself
+ * @param parent the parent field, if present or null
+ * @param name the name of the field
+ * @param value the value for the field from received document
+ * @return constructed or loaded from Atlas {@link CouchbaseField}
+ */
+ private static Stream<CouchbaseField> processField(CouchbaseCollection
collectionEntity, Collection<String> path, @Nullable CouchbaseField parent,
String name, Object value) {
+ // Let's figure out what type does this field have
+ CouchbaseFieldType fieldType = CouchbaseFieldType.infer(value);
+
+ // The full field path as it will be stored in Atlas
+ final Collection<String> fieldPath = new ArrayList<>(path);
+
+ fieldPath.add(name);
+
+ // constructing the field entity and loading it from cache or Atlas,
if previously stored there
+ CouchbaseField rootField = new CouchbaseField()
+ .name(name)
+ .fieldPath(fieldPath.stream().collect(Collectors.joining(".")))
+ .fieldType(fieldType)
+ .collection(collectionEntity)
+ .parentField(parent)
+ .get();
+
+ // return value
+ Stream<CouchbaseField> result = Stream.of(rootField);
+
+ // Recursive transformation of embedded object fields
+ if (fieldType == CouchbaseFieldType.OBJECT) {
+ // Normalizing the value
+ if (value instanceof JsonObject) {
+ value = ((JsonObject) value).toMap();
+ }
+
+ if (value instanceof Map) {
+ // Append embedded field entities to the resulting Stream
+ result = Stream.concat(
+ result,
+ ((Map<String, ?>) value).entrySet().stream()
+ // recursion
+ .flatMap(entity ->
processField(collectionEntity, fieldPath, rootField, entity.getKey(),
entity.getValue()))
+ );
+ } else {
+ throw new IllegalArgumentException(String.format("Incorrect
value type '%s' for field type 'object': a Map was expected instead.",
value.getClass()));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public String getMessageSource() {
+ return "couchbase";
+ }
+
+ /**
+ * Looks up the collection name by its vbucket identifier
+ *
+ * @param vbucket
+ * @param collid
+ * @return the name of the collection
+ */
+ private static CollectionInfo collectionInfo(int vbucket, long collid) {
+ return DCP.sessionState()
+ .get(vbucket)
+ .getCollectionsManifest()
+ .getCollection(collid);
+ }
+
+ protected static void setEntityInterceptors(Consumer<List<AtlasEntity>>
createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
+ CouchbaseHook.createInterceptor = createInterceptor;
+ CouchbaseHook.updateInterceptor = updateInterceptor;
+ }
+
+ static void loop(boolean loop) {
+ CouchbaseHook.loop = loop;
+ }
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
new file mode 100644
index 000000000..8bc8e42e9
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Base class for all couchbase atlas models
+ * The class uses "Self-Builder" pattern:
+ * 1. First, create the "builder" instance of the class
+ * 2. Populate the identifying fields of the class (check the `qualifiedName`
method of the entity for the list)
+ * (all setters return the instance just as a Builder would)
+ * 3. Call `get()` method to resolve the instance and replace it with
previously loaded from Atlas data (if present)
+ *
+ * Example:
+ * ```java
+ * clusterEntity = new CouchbaseCluster()
+ * .name(CBConfig.address())
+ * .url(CBConfig.address())
+ * .get();
+ * ```
+ *
+ * @param <E> extending class
+ */
+public abstract class CouchbaseAtlasEntity<E extends CouchbaseAtlasEntity<?>> {
+ private static final Map<Class, Map<String, AtlasEntity>>
ENTITY_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+ private static final Map<Class, Map<String, CouchbaseAtlasEntity>>
MODEL_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+ private String name;
+
+ public String name() {
+ return name;
+ }
+
+ public E name(String name) {
+ this.name = name;
+ return (E) this;
+ }
+
+ /**
+ * Loads or creates corresponding Atlas Entity and marks this model as
existing in Atlas
+ *
+ * @param atlas
+ * @return
+ */
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity atlasEntity = atlasEntity()
+ .filter(entity -> entity.getGuid().charAt(0) != '-')
+ .orElseGet(() ->
+ cache(load(atlas)
+ .orElseGet(() ->
+ atlasEntity()
+ .orElseGet(() -> new
AtlasEntity(atlasTypeName())))
+ )
+ );
+
+ atlasEntity.setAttribute("name", name);
+ atlasEntity.setAttribute("qualifiedName", qualifiedName());
+
+ updateAtlasEntity(atlasEntity);
+
+ return atlasEntity;
+ }
+
+ protected abstract String qualifiedName();
+
+ /**
+ * Looks up precreated atlas entity in the entity cache
+ * @return Optional of the cached entity
+ */
+ public Optional<AtlasEntity> atlasEntity() {
+ return cachedEntity().map(atlasEntity -> {
+ updateAtlasEntity(atlasEntity);
+ return atlasEntity;
+ });
+ }
+
+ /**
+ * Checks whether the model has the Atlas Entity created for it
+ * by looking it up in the entity cache.
+ * NOTE: this method does not check if the entity has been saved in Atlas
so,
+ * it will return true when the entity is already created and cached
but is yet to be sent to Atlas
+ *
+ * This method is _mostly_ used in related objects when setting
relationship field to ensure that related
+ * model has an AtlasEntity that can be referenced when storing
relationship information.
+ *
+ * @return true if the entity found
+ */
+ protected boolean exists() {
+ return cachedEntity().isPresent();
+ }
+
+ public abstract String atlasTypeName();
+
+ public abstract UUID id();
+
+ /**
+ * Invoked when the entity needs to be updated with values from the model
+ * @param entity the entity to write the values into
+ */
+ protected void updateAtlasEntity(AtlasEntity entity) {
+ // override me
+ }
+
+ /**
+ * Invoked when the model needs to be updated with values from the entity
+ * @param entity the entity to read the values from
+ */
+ protected void updateJavaModel(AtlasEntity entity) {
+ // override me
+ }
+
+ /**
+ * Loads the entity for this model from Atlas and stores it in the entity
cache
+ * @param client Atlas client to use
+ * @return loaded entity
+ */
+ private Optional<AtlasEntity> load(AtlasClientV2 client) {
+ try {
+ Map<String, String> query = new HashMap<>();
+ query.put("qualifiedName", qualifiedName());
+ AtlasEntity atlasEntity =
client.getEntityByAttribute(atlasTypeName(), query).getEntity();
+
+ if (atlasEntity != null) {
+ cache(atlasEntity);
+ if (atlasEntity.hasAttribute("name")) {
+ this.name = (String) atlasEntity.getAttribute("name");
+ }
+ updateJavaModel(atlasEntity);
+ return Optional.of(atlasEntity);
+ }
+ } catch (AtlasServiceException e) {
+ if (e.getStatus().getStatusCode() != 404) {
+ throw new RuntimeException(e);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Puts an entity into the entity cache
+ * @param atlasEntity the entity to cache
+ * @return the same entity
+ */
+ private AtlasEntity cache(AtlasEntity atlasEntity) {
+ if (!ENTITY_BY_TYPE_AND_ID.containsKey(getClass())) {
+ ENTITY_BY_TYPE_AND_ID.put(getClass(), new HashMap<>());
+ }
+
+ ENTITY_BY_TYPE_AND_ID.get(getClass()).put(id().toString(),
atlasEntity);
+ return atlasEntity;
+ }
+
+ /**
+ * Looks up the entity in the cache
+ * @return Optional of cached entity
+ */
+ private Optional<AtlasEntity> cachedEntity() {
+ return
Optional.ofNullable(ENTITY_BY_TYPE_AND_ID.getOrDefault(getClass(), (Map<String,
AtlasEntity>) Collections.EMPTY_MAP).getOrDefault(id().toString(), null));
+ }
+
+ /**
+ * First checks if the entity has been loaded and cached and, if not, then
tries to load it from Atlas
+ * @param atlas Atlas client to use
+ * @return true if the entity found either in cache or in Atlas
+ */
+ public boolean exists(AtlasClientV2 atlas) {
+ if (!exists()) {
+ return load(atlas).isPresent();
+ }
+ return true;
+ }
+
+ /**
+ * Returns pre-cached model with provided identifiers or caches this model
and returns it
+ *
+ * @return the model
+ */
+ public E get() {
+ Class<E> type = (Class<E>) getClass();
+ String id = id().toString();
+
+ // ensure valid cache structure
+ if (!MODEL_BY_TYPE_AND_ID.containsKey(type)) {
+ MODEL_BY_TYPE_AND_ID.put(type, Collections.synchronizedMap(new
HashMap<>()));
+ }
+
+ // put the model into the cache, if not already present
+ Map<String, CouchbaseAtlasEntity> modelsById =
MODEL_BY_TYPE_AND_ID.get(type);
+ if (!modelsById.containsKey(id)) {
+ try {
+ modelsById.put(id, this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return (E) modelsById.get(id);
+ }
+
+ public static void dropCache() {
+ ENTITY_BY_TYPE_AND_ID.clear();
+ MODEL_BY_TYPE_AND_ID.clear();
+ }
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
new file mode 100644
index 000000000..2983b2af2
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class CouchbaseBucket extends CouchbaseAtlasEntity<CouchbaseBucket> {
+ public static final String TYPE_NAME = "couchbase_bucket";
+ private CouchbaseCluster cluster;
+ private transient Map<String, CouchbaseScope> scopes =
Collections.synchronizedMap(new HashMap<>());
+
+ @Override
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity entity = super.atlasEntity(atlas);
+ entity.setRelationshipAttribute("cluster", cluster.atlasEntity(atlas));
+ return entity;
+ }
+
+ @Override
+ protected String qualifiedName() {
+ return String.format("%s/%s", cluster.qualifiedName(), name());
+ }
+
+ public CouchbaseBucket() {
+
+ }
+
+ public CouchbaseCluster cluster() {
+ return cluster;
+ }
+
+ public CouchbaseBucket cluster(CouchbaseCluster cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), cluster().id(), name()).getBytes(Charset.defaultCharset()));
+ }
+
+ public CouchbaseScope scope(String name) {
+ if (!scopes.containsKey(name)) {
+ scopes.put(name, new CouchbaseScope()
+ .bucket(this)
+ .name(name)
+ .get()
+ );
+ }
+
+ return scopes.get(name);
+ }
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
new file mode 100644
index 000000000..2f4efe865
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class CouchbaseCluster extends CouchbaseAtlasEntity<CouchbaseCluster> {
+ public static final String TYPE_NAME = "couchbase_cluster";
+ private String url;
+
+ public String url() {
+ return url;
+ }
+
+ public CouchbaseCluster url(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public static AtlasEntityDef atlasEntityDef() {
+ AtlasEntityDef definition = AtlasTypeUtil.createClassTypeDef(
+ "couchbase_cluster",
+ new HashSet<>()
+ );
+
+ definition.getSuperTypes().add("Asset");
+ definition.setServiceType("couchbase");
+ definition.setTypeVersion("0.1");
+
+ List<AtlasStructDef.AtlasAttributeDef> attributes =
definition.getAttributeDefs();
+
+ attributes.add(new AtlasStructDef.AtlasAttributeDef(
+ "url",
+ "string",
+ false,
+ AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE,
+ 1,
+ 1,
+ true,
+ true,
+ true,
+ Collections.EMPTY_LIST
+ ));
+
+ return definition;
+ }
+
+ public static Collection<? extends AtlasRelationshipDef>
atlasRelationshipDefs() {
+ return Arrays.asList(
+ new AtlasRelationshipDef(
+ "couchbase_cluster_buckets",
+ "",
+ "0.1",
+ "couchbase",
+ AtlasRelationshipDef.RelationshipCategory.AGGREGATION,
+ AtlasRelationshipDef.PropagateTags.ONE_TO_TWO,
+ new AtlasRelationshipEndDef(
+ "couchbase_cluster",
+ "buckets",
+
AtlasStructDef.AtlasAttributeDef.Cardinality.SET,
+ true
+ ),
+ new AtlasRelationshipEndDef(
+ "couchbase_bucket",
+ "cluster",
+
AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE,
+ false
+ )
+ )
+ );
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s", atlasTypeName(),
url()).getBytes(Charset.defaultCharset()));
+ }
+
+ @Override
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity entity = super.atlasEntity(atlas);
+ entity.setAttribute("url", url());
+ return entity;
+ }
+
+ @Override
+ protected String qualifiedName() {
+ return url();
+ }
+
+ @Override
+ protected void updateJavaModel(AtlasEntity entity) {
+ if (entity.hasAttribute("url")) {
+ this.url = (String) entity.getAttribute("url");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
new file mode 100644
index 000000000..b7319581d
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+public class CouchbaseCollection extends
CouchbaseAtlasEntity<CouchbaseCollection> {
+
+ private CouchbaseScope scope;
+
+ private long documentsAnalyzed;
+
+ public CouchbaseCollection scope(CouchbaseScope scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ @Override
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity entity = super.atlasEntity(atlas);
+ entity.setRelationshipAttribute("scope", scope.atlasEntity(atlas));
+ return entity;
+ }
+
+ @Override
+ protected void updateAtlasEntity(AtlasEntity entity) {
+ entity.setAttribute("documentsAnalyzed", documentsAnalyzed);
+ }
+
+ @Override
+ protected void updateJavaModel(AtlasEntity entity) {
+ documentsAnalyzed = (Integer) entity.getAttribute("documentsAnalyzed");
+ }
+
+ public long documentsAnalyzed() {
+ return documentsAnalyzed;
+ }
+
+ public CouchbaseCollection incrementAnalyzedDocuments() {
+ this.documentsAnalyzed++;
+ return this;
+ }
+
+ @Override
+ protected String qualifiedName() {
+ return String.format("%s/%s", scope.qualifiedName(), name());
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return "couchbase_collection";
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), scope().id().toString(),
name()).getBytes(Charset.defaultCharset()));
+ }
+
+ public CouchbaseScope scope() {
+ return this.scope;
+ }
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
new file mode 100644
index 000000000..5d14f6e20
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+public class CouchbaseField extends CouchbaseAtlasEntity<CouchbaseField> {
+ public static final String TYPE_NAME = "couchbase_field";
+ private CouchbaseFieldType fieldType;
+ private String fieldPath;
+ private long documentCount = 0;
+
+ private CouchbaseField parentField;
+
+ private CouchbaseCollection collection;
+
+ public CouchbaseField() {
+
+ }
+
+ public CouchbaseFieldType fieldType() {
+ return fieldType;
+ }
+
+ public CouchbaseField fieldType(CouchbaseFieldType fieldType) {
+ this.fieldType = fieldType;
+ return this;
+ }
+
+ public String fieldPath() {
+ return fieldPath;
+ }
+
+ public CouchbaseField fieldPath(String fieldPath) {
+ this.fieldPath = fieldPath;
+ return this;
+ }
+
+ public long documentCount() {
+ return documentCount;
+ }
+
+ public CouchbaseField documentCount(long documentCount) {
+ this.documentCount = documentCount;
+ return this;
+ }
+
+ public void incrementDocumentCount() {
+ this.documentCount++;
+ }
+
+ public CouchbaseCollection collection() {
+ return collection;
+ }
+
+ public CouchbaseField collection(CouchbaseCollection collection) {
+ this.collection = collection;
+ return this;
+ }
+
+ @Override
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity entity = super.atlasEntity(atlas);
+ entity.setRelationshipAttribute("collection",
collection.atlasEntity(atlas));
+ if (parentField != null) {
+ entity.setRelationshipAttribute("parentField",
parentField.atlasEntity(atlas));
+ }
+ return entity;
+ }
+
+ @Override
+ protected void updateAtlasEntity(AtlasEntity entity) {
+ entity.setAttribute("fieldType", fieldType.toString());
+ entity.setAttribute("fieldPath", fieldPath);
+ entity.setAttribute("documentCount", documentCount);
+ }
+
+ @Override
+ protected String qualifiedName() {
+ return String.format("%s/%s:%s", collection.qualifiedName(),
fieldPath(), fieldType());
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return
UUID.nameUUIDFromBytes(qualifiedName().getBytes(Charset.defaultCharset()));
+ }
+
+ public CouchbaseField parentField() {
+ return parentField;
+ }
+
+ public CouchbaseField parentField(CouchbaseField parent) {
+ this.parentField = parent;
+ return this;
+ }
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
new file mode 100644
index 000000000..8355f60cf
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import com.couchbase.client.core.error.InvalidArgumentException;
+import com.couchbase.client.java.json.JsonObject;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public enum CouchbaseFieldType {
+ BOOLEAN,
+ NUMBER,
+ STRING,
+ ARRAY,
+ OBJECT,
+ BINARY;
+
+ public static CouchbaseFieldType infer(@Nonnull Object value) {
+ if (value instanceof Map || value instanceof JsonObject) {
+ return OBJECT;
+ } else if (value instanceof Collection || value.getClass().isArray()) {
+ if (value.getClass().isArray() &&
Byte.class.isAssignableFrom(value.getClass().getComponentType())) {
+ return BINARY;
+ }
+ return ARRAY;
+ } else if (value instanceof Number) {
+ return NUMBER;
+ } else if (value instanceof Boolean) {
+ return BOOLEAN;
+ } else if (value instanceof String) {
+ String sValue = (String) value;
+ if ("true".equalsIgnoreCase(sValue) ||
"false".equalsIgnoreCase(sValue)) {
+ return BOOLEAN;
+ }
+ try {
+ Double.parseDouble(sValue);
+ return NUMBER;
+ } catch (NumberFormatException nfe) {
+ return STRING;
+ }
+ }
+
+ throw new IllegalArgumentException("Failed to infer type");
+ }
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.getDefault());
+ }
+
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
new file mode 100644
index 000000000..13f4e4852
--- /dev/null
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CouchbaseScope extends CouchbaseAtlasEntity<CouchbaseScope> {
+
+ public static final String TYPE_NAME = "couchbase_scope";
+ private CouchbaseBucket bucket;
+
+ private transient Map<String, CouchbaseCollection> collections =
Collections.synchronizedMap(new HashMap<>());
+
+ public CouchbaseBucket bucket() {
+ return bucket;
+ }
+
+ public CouchbaseScope bucket(CouchbaseBucket bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(
+ String.format(
+ "%s:%s:%s",
+ atlasTypeName(),
+ bucket().id().toString(),
+ name()
+ ).getBytes(Charset.defaultCharset())
+ );
+ }
+
+ @Override
+ public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+ AtlasEntity entity = super.atlasEntity(atlas);
+ entity.setRelationshipAttribute("bucket", bucket.atlasEntity(atlas));
+ return entity;
+ }
+
+ @Override
+ public String qualifiedName() {
+ return String.format("%s/%s", bucket.qualifiedName(), name());
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ public CouchbaseCollection collection(String name) {
+ if (!collections.containsKey(name)) {
+ collections.put(name, new CouchbaseCollection()
+ .name(name)
+ .scope(this)
+ .get()
+ );
+ }
+
+ return collections.get(name);
+ }
+}
diff --git
a/addons/couchbase-bridge/src/main/resources/atlas-application.properties
b/addons/couchbase-bridge/src/main/resources/atlas-application.properties
new file mode 100644
index 000000000..3df60b02f
--- /dev/null
+++ b/addons/couchbase-bridge/src/main/resources/atlas-application.properties
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
new file mode 100644
index 000000000..36d81a3ac
--- /dev/null
+++
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.atlas.connector.entities.CouchbaseAtlasEntity;
+import com.couchbase.atlas.connector.entities.CouchbaseBucket;
+import com.couchbase.atlas.connector.entities.CouchbaseCluster;
+import com.couchbase.atlas.connector.entities.CouchbaseScope;
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.StreamFrom;
+import com.couchbase.client.dcp.StreamTo;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class CouchbaseHookTest {
+
+ private Client mockDcpClient() {
+ Client mockDcpClient = Mockito.mock(Client.class);
+ Mockito.when(mockDcpClient.connect()).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.initializeState(StreamFrom.NOW,
StreamTo.INFINITY)).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.startStreaming()).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.disconnect()).thenReturn(Mono.empty());
+ return mockDcpClient;
+ }
+
+ private AtlasClientV2 mockAtlasClient(boolean returnEntities) throws
Exception {
+ AtlasClientV2 mockAtlasClient = Mockito.mock(AtlasClientV2.class);
+ final String clusterName = "couchbase://localhost";
+ final String bucketName = String.format("%s/%s", clusterName,
"default");
+ final String scopeName = String.format("%s/%s", bucketName,
"_default");
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ Assert.assertEquals(clusterName, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ Assert.assertEquals(bucketName, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ Assert.assertEquals(scopeName, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+ return mockAtlasClient;
+ }
+
+ @Test
+ public void testMain() throws Exception {
+ Client mockDcpClient = mockDcpClient();
+ CBConfig.dcpClient(mockDcpClient);
+ AtlasClientV2 mockAtlasClient = mockAtlasClient(false);
+ AtlasConfig.client(mockAtlasClient);
+
+ AtomicInteger createCalled = new AtomicInteger();
+ Consumer<List<AtlasEntity>> createEntitiesInterceptor = ents -> {
+ createCalled.getAndIncrement();
+ Assert.assertEquals(ents.size(), 2);
+ };
+ Consumer<List<AtlasEntity>> updateEntitiesInterceptor = ents -> {
+ Assert.assertTrue(false);
+ };
+
+ CouchbaseHook.setEntityInterceptors(createEntitiesInterceptor,
updateEntitiesInterceptor);
+ CouchbaseHook.loop(false);
+ // AAAAAND, ACTION (missing entities)
+ CouchbaseHook.main(new String[0]);
+
+ Mockito.verify(mockDcpClient, Mockito.times(1)).connect();
+ Assert.assertEquals(1, createCalled.get());
+ // 2 times: 1 time when we call exists(ATLAS) and second time when we
request the entity
+ validateAtlasInvocations(mockAtlasClient, 3, 2, 0);
+
+ // simulate existing entities situation
+ mockAtlasClient = mockAtlasClient(true);
+ AtlasConfig.client(mockAtlasClient);
+ CouchbaseAtlasEntity.dropCache();
+
+ // ACTION AGAIN, this time with mock entities in mock Atlas
+ CouchbaseHook.main(new String[0]);
+
+ Mockito.verify(mockDcpClient, Mockito.times(2)).connect();
+ Assert.assertEquals(1, createCalled.get());
+ // 1 time and then it should be cached
+ validateAtlasInvocations(mockAtlasClient, 1, 1, 0);
+
+ testEvents(CouchbaseHook.INSTANCE);
+ }
+
+ public void testEvents(CouchbaseHook listener) {
+
+ }
+
+ private void validateAtlasInvocations(AtlasClientV2 mockAtlasClient, int
cluster, int bucket, int scope) throws Exception {
+ Mockito.verify(mockAtlasClient,
Mockito.times(cluster)).getEntityByAttribute(
+ Mockito.eq(CouchbaseCluster.TYPE_NAME),
+ Mockito.anyMap()
+ );
+ Mockito.verify(mockAtlasClient,
Mockito.times(bucket)).getEntityByAttribute(
+ Mockito.eq(CouchbaseBucket.TYPE_NAME),
+ Mockito.anyMap()
+ );
+ Mockito.verify(mockAtlasClient,
Mockito.times(scope)).getEntityByAttribute(
+ Mockito.eq(CouchbaseScope.TYPE_NAME),
+ Mockito.anyMap()
+ );
+ }
+}
\ No newline at end of file
diff --git
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
new file mode 100644
index 000000000..5449d6778
--- /dev/null
+++
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests atlas entity loading and caching
+ */
+public class CouchbaseAtlasEntityTest {
+ final static String QUALIFIED_NAME = "testEntityQualifiedName";
+ final static String TYPE_NAME = "testEntityTypeName";
+ final static UUID ID = UUID.randomUUID();
+
+ public class TestEntity extends CouchbaseAtlasEntity<TestEntity> {
+
+ @Override
+ protected String qualifiedName() {
+ return QUALIFIED_NAME;
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return ID;
+ }
+ }
+
+ @Test
+ public void testEntityLoading() throws Exception {
+ final AtlasClientV2 ac = Mockito.mock(AtlasClientV2.class);
+ final AtlasEntity ae = Mockito.mock(AtlasEntity.class);
+
+ Mockito.when(ae.getAttribute(Mockito.eq("qualifiedName")))
+ .thenReturn(QUALIFIED_NAME);
+
+ Mockito.when(
+ ac.getEntityByAttribute(
+ Mockito.eq(TYPE_NAME),
+ Mockito.anyMap()
+ )
+ ).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ Assert.assertTrue(query.containsKey("qualifiedName"));
+ Assert.assertEquals(QUALIFIED_NAME, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(ae);
+ });
+
+ TestEntity subject = Mockito.spy(new TestEntity());
+ // exists must return false at this point as we've just created the
model but it doesn't have the corresponding AtlasEntity
+ // and the cache should be empty
+ Assert.assertFalse(subject.exists());
+ Assert.assertSame(subject, subject.get());
+ Assert.assertFalse(subject.exists());
+ // ditto
+ Assert.assertTrue(!subject.atlasEntity().isPresent());
+ // Because our client mock should return the mock entity, exists with
Atlas check should find the entity,
+ // cache it, and return true
+ Assert.assertTrue(subject.exists(ac));
+ // and call the method to update our model
+ Mockito.verify(subject,
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
+ // Let's validate that exists with Atlas check did, in fact, query our
atlas mock for the entity
+ Mockito.verify(ac,
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
+ // the entity should exist in cache
+ Assert.assertTrue(subject.exists());
+ // and exists with Atlas check should use it
+ Assert.assertTrue(subject.exists(ac));
+ // so, let's verify that the item was pulled not from atlas (from
cache will be the only option left)
+ Mockito.verify(ac,
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
+
+ // This method should return filled Optional with our mocked entity
pulled from cache
+ // And, no matter how many times we call, the result should be the
same (but let's make sure that we call it at least twice)
+ int timesToLoadEntity = 2 + (int) (Math.random() * 98);
+ for (int i = 0; i < timesToLoadEntity; i++) {
+ Assert.assertSame(ae, subject.atlasEntity().get());
+ }
+ // verify that atlas entity was updated every time we requested it
+ Mockito.verify(subject,
Mockito.times(timesToLoadEntity)).updateAtlasEntity(Mockito.eq(ae));
+ // verify that the model was not updated when we requested the entity
second time
+ Mockito.verify(subject,
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
+ }
+}
\ No newline at end of file
diff --git a/addons/couchbase-bridge/src/test/resources/log4j.xml
b/addons/couchbase-bridge/src/test/resources/log4j.xml
new file mode 100644
index 000000000..e53c232d0
--- /dev/null
+++ b/addons/couchbase-bridge/src/test/resources/log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m
(%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <root>
+ <level value="DEBUG"/>
+ <appender-ref ref="console"/>
+ </root>
+
+</log4j:configuration>
diff --git a/addons/models/5000-Couchbase/5020-couchbase_model.json
b/addons/models/5000-Couchbase/5020-couchbase_model.json
new file mode 100644
index 000000000..acd5d798d
--- /dev/null
+++ b/addons/models/5000-Couchbase/5020-couchbase_model.json
@@ -0,0 +1,159 @@
+{
+ "enumDefs": [
+ {
+ "name": "couchbase_field_type",
+ "typeVersion": "1.0",
+ "service_type": "couchbase",
+ "elementDefs": [
+ { "ordinal": 0, "value": "boolean" },
+ { "ordinal": 1, "value": "number" },
+ { "ordinal": 2, "value": "string" },
+ { "ordinal": 3, "value": "array" },
+ { "ordinal": 4, "value": "object" },
+ { "ordinal": 5, "value": "binary" }
+ ]
+ }
+ ],
+ "structDefs": [],
+ "classificationDefs": [],
+ "entityDefs": [
+ {
+ "name": "couchbase_cluster",
+ "superTypes": [ "Asset" ],
+ "serviceType": "couchbase",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "url",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": false,
+ "isUnique": true,
+ "includeInNotification": true
+ }
+ ]
+ },
+ {
+ "name": "couchbase_bucket",
+ "superTypes": [ "Asset" ],
+ "serviceType": "couchbase",
+ "typeVersion": "1.0",
+ "attributeDefs": [ ]
+ },
+ {
+ "name": "couchbase_scope",
+ "superTypes": [ "Asset" ],
+ "serviceType": "couchbase",
+ "typeVersion": "1.0",
+ "attributeDefs": []
+ },
+ {
+ "name": "couchbase_collection",
+ "superTypes": [ "DataSet" ],
+ "serviceType": "couchbase",
+ "typeVersion": "1.0",
+ "options": { "schemaElementsAttribute": "fields" },
+ "attributeDefs": [
+ {
+ "name": "documentsAnalyzed",
+ "typeName": "long",
+ "isOptional": false,
+ "cardinality": "SINGLE",
+ "valuesMinCount": 1,
+ "valuesMaxCount": 1,
+ "isUnique": false,
+ "isIndexable": false,
+ "includeInNotification": false
+ }
+ ]
+ },
+ {
+ "name": "couchbase_field",
+ "superTypes": [ "DataSet" ],
+ "serviceType": "couchbase",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "fieldType",
+ "typeName": "couchbase_field_type",
+ "isOptional": false,
+ "cardinality": "SINGLE",
+ "valuesMinCount": 1,
+ "valuesMaxCount": 1,
+ "isUnique": false,
+ "isIndexable": true,
+ "includeInNotification": false
+ },
+ {
+ "name": "fieldPath",
+ "typeName": "string",
+ "isOptional": false,
+ "cardinality": "SINGLE",
+ "valuesMinCount": 1,
+ "valuesMaxCount": 1,
+ "isUnique": false,
+ "isIndexable": true,
+ "includeInNotification": false
+ },
+ {
+ "name": "documentCount",
+ "typeName": "long",
+ "isOptional": false,
+ "cardinality": "SINGLE",
+ "valuesMinCount": 1,
+ "valuesMaxCount": 1,
+ "isUnique": false,
+ "isIndexable": false
+ }
+ ]
+ }
+ ],
+ "relationshipDefs": [
+ {
+ "name": "couchbase_cluster_buckets",
+ "typeVersion": "1.0",
+ "serviceType": "couchbase",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "ONE_TO_TWO",
+ "endDef1": { "type": "couchbase_cluster", "name": "buckets",
"cardinality": "SET", "isContainer": true },
+ "endDef2": { "type": "couchbase_bucket", "name": "cluster",
"cardinality": "SINGLE", "isContainer": false }
+ },
+ {
+ "name": "couchbase_bucket_scopes",
+ "typeVersion": "1.0",
+ "serviceType": "couchbase",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "ONE_TO_TWO",
+ "endDef1": { "type": "couchbase_bucket", "name": "scopes",
"cardinality": "SET", "isContainer": true },
+ "endDef2": { "type": "couchbase_scope", "name": "bucket",
"cardinality": "SINGLE", "isContainer": false }
+ },
+ {
+ "name": "couchbase_scope_collections",
+ "typeVersion": "1.0",
+ "serviceType": "couchbase",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "ONE_TO_TWO",
+ "endDef1": { "type": "couchbase_scope", "name": "collections",
"cardinality": "SET", "isContainer": true },
+ "endDef2": { "type": "couchbase_collection", "name": "scope",
"cardinality": "SINGLE", "isContainer": false }
+ },
+ {
+ "name": "couchbase_collection_fields",
+ "typeVersion": "1.0",
+ "serviceType": "couchbase",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "ONE_TO_TWO",
+ "endDef1": { "type": "couchbase_collection", "name": "fields",
"cardinality": "SET", "isContainer": true },
+ "endDef2": { "type": "couchbase_field", "name": "collection",
"cardinality": "SINGLE", "isContainer": false }
+ },
+ {
+ "name": "couchbase_field_fields",
+ "typeVersion": "1.0",
+ "serviceType": "couchbase",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "ONE_TO_TWO",
+ "endDef1": { "type": "couchbase_field", "name": "objectFields",
"cardinality": "SET", "isContainer": true },
+ "endDef2": { "type": "couchbase_field", "name": "parentField",
"cardinality": "SINGLE", "isContainer": false }
+ }
+ ]
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index ed477dfbb..9df238eb3 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -129,6 +129,7 @@ atlas.graph.storage.hbase.regions-per-server=1
<descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-kafka-hook-package.xml</descriptor>
+
<descriptor>src/main/assemblies/atlas-couchbase-hook-package.xml</descriptor>
<descriptor>src/main/assemblies/atlas-server-package.xml</descriptor>
<descriptor>src/main/assemblies/standalone-package.xml</descriptor>
<descriptor>src/main/assemblies/src-package.xml</descriptor>
diff --git a/distro/src/main/assemblies/atlas-couchbase-hook-package.xml
b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml
new file mode 100644
index 000000000..5022bef51
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <id>couchbase-hook</id>
+
<baseDirectory>apache-atlas-couchbase-hook-${project.version}</baseDirectory>
+ <fileSets>
+ <fileSet>
+
<directory>../addons/couchbase-bridge/target/dependency/hook</directory>
+ <outputDirectory>hook</outputDirectory>
+ </fileSet>
+
+ </fileSets>
+</assembly>
diff --git a/distro/src/main/assemblies/standalone-package.xml
b/distro/src/main/assemblies/standalone-package.xml
index 3e2ca1c39..888d3edf4 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -180,6 +180,12 @@
<outputDirectory>hook</outputDirectory>
</fileSet>
+ <!-- addons/couchbase -->
+ <fileSet>
+
<directory>../addons/couchbase-bridge/target/dependency/hook</directory>
+ <outputDirectory>hook</outputDirectory>
+ </fileSet>
+
<!-- for kafka topic setup -->
<fileSet>
<directory>../addons/kafka-bridge/src/bin</directory>
diff --git a/docs/src/documents/Hook/HookCouchbase.md
b/docs/src/documents/Hook/HookCouchbase.md
new file mode 100644
index 000000000..53b9e9cb3
--- /dev/null
+++ b/docs/src/documents/Hook/HookCouchbase.md
@@ -0,0 +1,45 @@
+---
+name: Couchbase
+route: /HookCouchbase
+menu: Documentation
+submenu: Hooks
+---
+
+import themen from 'theme/styles/styled-colors';
+import * as theme from 'react-syntax-highlighter/dist/esm/styles/hljs';
+import SyntaxHighlighter from 'react-syntax-highlighter';
+
+# Apache Atlas Couchbase bridge
+This bridge connects to a Couchbase cluster using DCP protocol
+and performs real-time analysis and metadata extraction from stored on the
cluster documents.
+The extracted metadata is then sent to Atlas via its REST API
+
+## Configuration
+The bridge uses environment variables for configuration.
+
+### Atlas REST API
+| Environment variable | Description
|
Default Value |
+|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------|
+| ATLAS_URL | Atlas REST API url
|
"http://localhost:21000" |
+| ATLAS_USERNAME | Atlas REST API username
|
"admin" |
+| ATLAS_PASSWORD | Atlas REST API password
|
"admin" |
+
+### Couchbase DCP connection
+| Environment variable | Description
|
Default Value |
+|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------|
+| CB_CLUSTER | Couchbase Cluster connection string
|
"couchbase://localhost" |
+| CB_USERNAME | Couchbase Cluster username
|
"Administrator" |
+| CB_PASSWORD | Couchbase Cluster password
|
"password" |
+| CB_ENABLE_TLS | Use TLS
|
false |
+| CB_BUCKET | Couchbase bucket to monitor
|
"default" |
+| CB_COLLECTIONS | Comma-separated list of collections to monitor with
each collection listed as <scope>.<collection> |
|
+| DCP_PORT | DCP port to use
|
11210 |
+| DCP_FIELD_THRESHOLD | A threshold that indicates in what percentage of
analyzed messages per collection a field must appear before it is sent to
Atlas | 0 |
+| DCP_SAMPLE_RATIO | Percentage of DCP messages to be analyzed in form of
a short between 0 and 1. |
1 |
+
+## Running the bridge
+In the following commands, replace `<VARNAME>` and `<version>` with
appropriate values.
+* Set environment variables using `export <VARNAME>=...` or prefix them before
the next command.
+* Run `java -cp couchbase-bridge-<version>.jar
com.couchbase.atlas.connector.CouchbaseHook`.
+* Verify that appropriate `couchbase_cluster`, `couchbase_bucket` objects were
created on your Atlas cluster.
+* After having some documents updated on the Couchbase cluster, verify that
appropriate `couchbase_scope`, `couchbase_collection`, and `couchbase_field`
objects were created on your Atlas cluster.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6a0e68485..c55bec763 100644
--- a/pom.xml
+++ b/pom.xml
@@ -849,6 +849,7 @@
<module>addons/impala-hook-api</module>
<module>addons/impala-bridge-shim</module>
<module>addons/impala-bridge</module>
+ <module>addons/couchbase-bridge</module>
<module>distro</module>
<module>atlas-examples</module>
@@ -1615,6 +1616,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>couchbase-bridge</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- API documentation -->
<dependency>
<groupId>com.webcohesion.enunciate</groupId>