This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new 79325d43c ATLAS-4937: checkstyle compliance updates - couchbase-bridge
module (#282)
79325d43c is described below
commit 79325d43c9d7e23eddd19cc76df08c97f6f75a23
Author: Abhishek Kumar <[email protected]>
AuthorDate: Sun Feb 9 22:30:01 2025 -0800
ATLAS-4937: checkstyle compliance updates - couchbase-bridge module (#282)
(cherry picked from commit 1ec2dad4681aec6725e2a18ae71dd88c9d170da1)
---
addons/couchbase-bridge/pom.xml | 4 +
.../com/couchbase/atlas/connector/AtlasConfig.java | 8 +-
.../com/couchbase/atlas/connector/CBConfig.java | 8 +-
.../couchbase/atlas/connector/CouchbaseHook.java | 216 +++++++++------------
.../connector/entities/CouchbaseAtlasEntity.java | 138 ++++++-------
.../atlas/connector/entities/CouchbaseBucket.java | 45 ++---
.../atlas/connector/entities/CouchbaseCluster.java | 49 ++---
.../connector/entities/CouchbaseCollection.java | 41 ++--
.../atlas/connector/entities/CouchbaseField.java | 35 ++--
.../connector/entities/CouchbaseFieldType.java | 8 +-
.../atlas/connector/entities/CouchbaseScope.java | 43 ++--
.../atlas/connector/CouchbaseHookTest.java | 105 +++++-----
.../entities/CouchbaseAtlasEntityTest.java | 82 ++++----
13 files changed, 348 insertions(+), 434 deletions(-)
diff --git a/addons/couchbase-bridge/pom.xml b/addons/couchbase-bridge/pom.xml
index 22661846d..cdc6ff161 100644
--- a/addons/couchbase-bridge/pom.xml
+++ b/addons/couchbase-bridge/pom.xml
@@ -31,6 +31,10 @@
<name>Apache Atlas Couchbase Bridge</name>
<description>Apache Atlas Couchbase Bridge Module</description>
+ <properties>
+ <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+ <checkstyle.skip>false</checkstyle.skip>
+ </properties>
<dependencies>
<dependency>
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
index 3168d7bfa..028aa9338 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
@@ -22,10 +22,10 @@ import java.util.Map;
public class AtlasConfig {
private static final Map<String, String> ENV = System.getenv();
- private static AtlasClientV2 client = null;
+ private static AtlasClientV2 client;
public static String[] urls() {
- return new String[] { ENV.getOrDefault("ATLAS_URL",
"http://localhost:21000") };
+ return new String[] {ENV.getOrDefault("ATLAS_URL",
"http://localhost:21000")};
}
public static String username() {
@@ -40,6 +40,10 @@ public class AtlasConfig {
return new String[] {username(), password()};
}
+ private AtlasConfig() {
+ // to block instantiation
+ }
+
public static AtlasClientV2 client() {
if (client == null) {
client = new AtlasClientV2(urls(), auth());
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
index 70e9dc37c..339aac84c 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
@@ -37,7 +37,9 @@ public class CBConfig {
private static Client mockDcpClient;
private static Cluster cluster;
-
+ private CBConfig() {
+ // to block instantiation
+ }
public static String address() {
return ENV.getOrDefault("CB_CLUSTER", "couchbase://localhost");
@@ -106,7 +108,7 @@ public class CBConfig {
Client.Builder builder = Client.builder()
.collectionsAware(true)
- .seedNodes(String.format("%s:%s",address(),dcpPort()))
+ .seedNodes(String.format("%s:%s", address(), dcpPort()))
.connectionString(address())
.credentials(username(), password());
@@ -142,4 +144,4 @@ public class CBConfig {
private static boolean enableTLS() {
return Boolean.parseBoolean(ENV.getOrDefault("CB_ENABLE_TLS",
"false"));
}
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
index 0a73307c8..143cba18c 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -57,13 +58,12 @@ import java.util.stream.Stream;
public class CouchbaseHook extends AtlasHook implements ControlEventHandler,
DataEventHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CouchbaseHook.class);
- protected static CouchbaseHook INSTANCE;
- protected static Client DCP;
- protected static AtlasClientV2 ATLAS;
- private static Consumer<List<AtlasEntity>> createInterceptor;
- private static Consumer<List<AtlasEntity>> updateInterceptor;
- private static boolean loop = true;
-
+ protected static CouchbaseHook instance;
+ protected static Client dcpClient;
+ protected static AtlasClientV2 atlasClient;
+ private static Consumer<List<AtlasEntity>> createInterceptor;
+ private static Consumer<List<AtlasEntity>> updateInterceptor;
+ private static boolean loop = true;
private CouchbaseCluster clusterEntity;
private CouchbaseBucket bucketEntity;
@@ -75,33 +75,31 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
*/
public static void main(String[] args) {
// create instances of DCP client,
- DCP = CBConfig.dcpClient();
+ dcpClient = CBConfig.dcpClient();
// Atlas client,
- ATLAS = AtlasConfig.client();
+ atlasClient = AtlasConfig.client();
// and DCP handler
- INSTANCE = new CouchbaseHook();
+ instance = new CouchbaseHook();
// register DCP handler with DCP client
- DCP.controlEventHandler(INSTANCE);
- DCP.dataEventHandler(INSTANCE);
+ dcpClient.controlEventHandler(instance);
+ dcpClient.dataEventHandler(instance);
// Connect to the cluster
- DCP.connect().block();
+ dcpClient.connect().block();
LOG.info("DCP client connected.");
- // Ensure the existence of corresponding
- // CouchbaseCluster, CouchbaseBucket, CouchbaseScope
- // entities and store them in local cache
- INSTANCE.initializeAtlasContext();
+ // Ensure the existence of corresponding CouchbaseCluster,
CouchbaseBucket, CouchbaseScope entities and store them in local cache
+ instance.initializeAtlasContext();
// Start listening to DCP
- DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
+ dcpClient.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
System.out.println("Starting the stream...");
- DCP.startStreaming().block();
+ dcpClient.startStreaming().block();
System.out.println("Started the stream.");
// And then just loop the loop
@@ -109,69 +107,19 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
while (loop) {
Thread.sleep(1000);
}
- } catch (InterruptedException e) {
-
+ } catch (InterruptedException ignored) {
} finally {
- DCP.disconnect().block();
+ dcpClient.disconnect().block();
}
}
- /**
- * Ensures the existence of CouchbaseCluster,
- * CouchbaseBucket and Couchbase scope entities
- * and stores them into local cache
- */
- private void initializeAtlasContext() {
- LOG.debug("Creating cluster/bucket/scope entities");
-
- clusterEntity = new CouchbaseCluster()
- .name(CBConfig.address())
- .url(CBConfig.address())
- .get();
-
- bucketEntity = new CouchbaseBucket()
- .name(CBConfig.bucket())
- .cluster(clusterEntity)
- .get();
-
- List<AtlasEntity> entitiesToCreate = new ArrayList<>();
-
- if (!clusterEntity.exists(ATLAS)) {
- entitiesToCreate.add(clusterEntity.atlasEntity(ATLAS));
- }
-
- if (!bucketEntity.exists(ATLAS)) {
- entitiesToCreate.add(bucketEntity.atlasEntity(ATLAS));
- }
-
- if (!entitiesToCreate.isEmpty()) {
- createEntities(entitiesToCreate);
- }
- }
-
- private void createEntities(List<AtlasEntity> entities) {
- if (createInterceptor != null) {
- createInterceptor.accept(entities);
- return;
- }
-
- AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
- EntityCreateRequestV2 request = new
EntityCreateRequestV2("couchbase", entity);
-
- notifyEntities(Arrays.asList(request), null);
+ protected static void setEntityInterceptors(Consumer<List<AtlasEntity>>
createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
+ CouchbaseHook.createInterceptor = createInterceptor;
+ CouchbaseHook.updateInterceptor = updateInterceptor;
}
- private void updateEntities(List<AtlasEntity> entities) {
- if (updateInterceptor != null) {
- updateInterceptor.accept(entities);
-
- return;
- }
-
- AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
- EntityUpdateRequestV2 request = new
EntityUpdateRequestV2("couchbase", entity);
-
- notifyEntities(Arrays.asList(request), null);
+ static void loop(boolean loop) {
+ CouchbaseHook.loop = loop;
}
@Override
@@ -201,28 +149,26 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
List<AtlasEntity> toCreate = new ArrayList<>();
List<AtlasEntity> toUpdate = new ArrayList<>();
- if (!scopeEntity.exists(ATLAS)) {
- toCreate.add(scopeEntity.atlasEntity(ATLAS));
+ if (!scopeEntity.exists(atlasClient)) {
+ toCreate.add(scopeEntity.atlasEntity(atlasClient));
LOG.debug("Creating scope: {}",
scopeEntity.qualifiedName());
} else {
- toUpdate.add(scopeEntity.atlasEntity(ATLAS));
+ toUpdate.add(scopeEntity.atlasEntity(atlasClient));
LOG.debug("Updating scope: {}",
scopeEntity.qualifiedName());
}
CouchbaseCollection collectionEntity =
scopeEntity.collection(collectionName);
- // Let's record this attempt to analyze a collection document
- // so that we can calculate field frequencies
- // when filtering them via DCP_FIELD_THRESHOLD
+ // Let's record this attempt to analyze a collection document
so that we can calculate field frequencies when filtering them via
DCP_FIELD_THRESHOLD
collectionEntity.incrementAnalyzedDocuments();
// and then schedule it to be sent to Atlas
- if (!collectionEntity.exists(ATLAS)) {
- toCreate.add(collectionEntity.atlasEntity(ATLAS));
+ if (!collectionEntity.exists(atlasClient)) {
+ toCreate.add(collectionEntity.atlasEntity(atlasClient));
} else {
- toUpdate.add(collectionEntity.atlasEntity(ATLAS));
+ toUpdate.add(collectionEntity.atlasEntity(atlasClient));
}
Map<String, Object> document =
JsonObject.fromJson(DcpMutationMessage.contentBytes(event)).toMap();
@@ -237,13 +183,13 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
// update document counter on the field entity
.peek(CouchbaseField::incrementDocumentCount)
// Only passes fields that either already in Atlas or
pass DCP_FIELD_THRESHOLD setting
- .filter(field -> field.exists(ATLAS) ||
field.documentCount() / (float) collectionEntity.documentsAnalyzed() >
CBConfig.dcpFieldThreshold())
+ .filter(field -> field.exists(atlasClient) ||
field.documentCount() / (float) collectionEntity.documentsAnalyzed() >
CBConfig.dcpFieldThreshold())
// Schedule the entity either for creation or to be
updated in Atlas
.forEach(field -> {
- if (field.exists(ATLAS)) {
- toUpdate.add(field.atlasEntity(ATLAS));
+ if (field.exists(atlasClient)) {
+ toUpdate.add(field.atlasEntity(atlasClient));
} else {
- toCreate.add(field.atlasEntity(ATLAS));
+ toCreate.add(field.atlasEntity(atlasClient));
}
});
@@ -257,14 +203,68 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
}
}
+ @Override
+ public String getMessageSource() {
+ return "couchbase";
+ }
+
+ /**
+ * Ensures the existence of CouchbaseCluster, CouchbaseBucket and
Couchbase scope entities and stores them into local cache
+ */
+ private void initializeAtlasContext() {
+ LOG.debug("Creating cluster/bucket/scope entities");
+
+ clusterEntity = new
CouchbaseCluster().name(CBConfig.address()).url(CBConfig.address()).get();
+ bucketEntity = new
CouchbaseBucket().name(CBConfig.bucket()).cluster(clusterEntity).get();
+
+ List<AtlasEntity> entitiesToCreate = new ArrayList<>();
+
+ if (!clusterEntity.exists(atlasClient)) {
+ entitiesToCreate.add(clusterEntity.atlasEntity(atlasClient));
+ }
+
+ if (!bucketEntity.exists(atlasClient)) {
+ entitiesToCreate.add(bucketEntity.atlasEntity(atlasClient));
+ }
+
+ if (!entitiesToCreate.isEmpty()) {
+ createEntities(entitiesToCreate);
+ }
+ }
+
+ private void createEntities(List<AtlasEntity> entities) {
+ if (createInterceptor != null) {
+ createInterceptor.accept(entities);
+ return;
+ }
+
+ AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
+ EntityCreateRequestV2 request = new
EntityCreateRequestV2("couchbase", entity);
+
+ notifyEntities(Arrays.asList(request), null);
+ }
+
+ private void updateEntities(List<AtlasEntity> entities) {
+ if (updateInterceptor != null) {
+ updateInterceptor.accept(entities);
+
+ return;
+ }
+
+ AtlasEntitiesWithExtInfo entity = new
AtlasEntitiesWithExtInfo(entities);
+ EntityUpdateRequestV2 request = new
EntityUpdateRequestV2("couchbase", entity);
+
+ notifyEntities(Arrays.asList(request), null);
+ }
+
/**
* Constructs a {@link CouchbaseField} from field information
*
* @param collectionEntity the {@link CouchbaseCollection} to which the
field belongs
- * @param path the path to the field inside the collection
document excluding the field itself
- * @param parent the parent field, if present or null
- * @param name the name of the field
- * @param value the value for the field from received document
+ * @param path the path to the field inside the collection document
excluding the field itself
+ * @param parent the parent field, if present or null
+ * @param name the name of the field
+ * @param value the value for the field from received document
* @return constructed or loaded from Atlas {@link CouchbaseField}
*/
private static Stream<CouchbaseField> processField(CouchbaseCollection
collectionEntity, Collection<String> path, @Nullable CouchbaseField parent,
String name, Object value) {
@@ -277,13 +277,7 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
fieldPath.add(name);
// constructing the field entity and loading it from cache or Atlas,
if previously stored there
- CouchbaseField rootField = new CouchbaseField()
- .name(name)
- .fieldPath(fieldPath.stream().collect(Collectors.joining(".")))
- .fieldType(fieldType)
- .collection(collectionEntity)
- .parentField(parent)
- .get();
+ CouchbaseField rootField = new
CouchbaseField().name(name).fieldPath(fieldPath.stream().collect(Collectors.joining("."))).fieldType(fieldType).collection(collectionEntity).parentField(parent).get();
// return value
Stream<CouchbaseField> result = Stream.of(rootField);
@@ -301,8 +295,7 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
result,
((Map<String, ?>) value).entrySet().stream()
// recursion
- .flatMap(entity ->
processField(collectionEntity, fieldPath, rootField, entity.getKey(),
entity.getValue()))
- );
+ .flatMap(entity ->
processField(collectionEntity, fieldPath, rootField, entity.getKey(),
entity.getValue())));
} else {
throw new IllegalArgumentException(String.format("Incorrect
value type '%s' for field type 'object': a Map was expected instead.",
value.getClass()));
}
@@ -311,11 +304,6 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
return result;
}
- @Override
- public String getMessageSource() {
- return "couchbase";
- }
-
/**
* Looks up the collection name by its vbucket identifier
*
@@ -324,18 +312,6 @@ public class CouchbaseHook extends AtlasHook implements
ControlEventHandler, Dat
* @return the name of the collection
*/
private static CollectionInfo collectionInfo(int vbucket, long collid) {
- return DCP.sessionState()
- .get(vbucket)
- .getCollectionsManifest()
- .getCollection(collid);
- }
-
- protected static void setEntityInterceptors(Consumer<List<AtlasEntity>>
createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
- CouchbaseHook.createInterceptor = createInterceptor;
- CouchbaseHook.updateInterceptor = updateInterceptor;
- }
-
- static void loop(boolean loop) {
- CouchbaseHook.loop = loop;
+ return
dcpClient.sessionState().get(vbucket).getCollectionsManifest().getCollection(collid);
}
}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
index 8bc8e42e9..07e48e4f7 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
@@ -19,7 +19,6 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasStructDef;
import java.util.Collections;
import java.util.HashMap;
@@ -32,23 +31,28 @@ import java.util.UUID;
* The class uses "Self-Builder" pattern:
* 1. First, create the "builder" instance of the class
* 2. Populate the identifying fields of the class (check the `qualifiedName`
method of the entity for the list)
- * (all setters return the instance just as a Builder would)
+ * (all setters return the instance just as a Builder would)
* 3. Call `get()` method to resolve the instance and replace it with
previously loaded from Atlas data (if present)
- *
+ * <p>
* Example:
* ```java
- * clusterEntity = new CouchbaseCluster()
- * .name(CBConfig.address())
- * .url(CBConfig.address())
- * .get();
+ * clusterEntity = new CouchbaseCluster()
+ * .name(CBConfig.address())
+ * .url(CBConfig.address())
+ * .get();
* ```
*
* @param <E> extending class
*/
public abstract class CouchbaseAtlasEntity<E extends CouchbaseAtlasEntity<?>> {
- private static final Map<Class, Map<String, AtlasEntity>>
ENTITY_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
- private static final Map<Class, Map<String, CouchbaseAtlasEntity>>
MODEL_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
- private String name;
+ private static final Map<Class, Map<String, AtlasEntity>>
ENTITY_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+ private static final Map<Class, Map<String, CouchbaseAtlasEntity>>
MODEL_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+ private String name;
+
+ public static void dropCache() {
+ ENTITY_BY_TYPE_AND_ID.clear();
+ MODEL_BY_TYPE_AND_ID.clear();
+ }
public String name() {
return name;
@@ -72,9 +76,7 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
cache(load(atlas)
.orElseGet(() ->
atlasEntity()
- .orElseGet(() -> new
AtlasEntity(atlasTypeName())))
- )
- );
+ .orElseGet(() -> new
AtlasEntity(atlasTypeName())))));
atlasEntity.setAttribute("name", name);
atlasEntity.setAttribute("qualifiedName", qualifiedName());
@@ -84,10 +86,9 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
return atlasEntity;
}
- protected abstract String qualifiedName();
-
/**
* Looks up precreated atlas entity in the entity cache
+ *
* @return Optional of the cached entity
*/
public Optional<AtlasEntity> atlasEntity() {
@@ -97,14 +98,60 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
});
}
+ public abstract String atlasTypeName();
+
+ public abstract UUID id();
+
+ /**
+ * First checks if the entity has been loaded and cached and, if not, then
tries to load it from Atlas
+ *
+ * @param atlas Atlas client to use
+ * @return true if the entity found either in cache or in Atlas
+ */
+ public boolean exists(AtlasClientV2 atlas) {
+ if (!exists()) {
+ return load(atlas).isPresent();
+ }
+ return true;
+ }
+
+ /**
+ * Returns pre-cached model with provided identifiers or caches this model
and returns it
+ *
+ * @return the model
+ */
+ public E get() {
+ Class<E> type = (Class<E>) getClass();
+ String id = id().toString();
+
+ // ensure valid cache structure
+ if (!MODEL_BY_TYPE_AND_ID.containsKey(type)) {
+ MODEL_BY_TYPE_AND_ID.put(type, Collections.synchronizedMap(new
HashMap<>()));
+ }
+
+ // put the model into the cache, if not already present
+ Map<String, CouchbaseAtlasEntity> modelsById =
MODEL_BY_TYPE_AND_ID.get(type);
+ if (!modelsById.containsKey(id)) {
+ try {
+ modelsById.put(id, this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return (E) modelsById.get(id);
+ }
+
+ protected abstract String qualifiedName();
+
/**
* Checks whether the model has the Atlas Entity created for it
* by looking it up in the entity cache.
* NOTE: this method does not check if the entity has been saved in Atlas
so,
- * it will return true when the entity is already created and cached
but is yet to be sent to Atlas
- *
+ * it will return true when the entity is already created and cached but
is yet to be sent to Atlas
+ * <p>
* This method is _mostly_ used in related objects when setting
relationship field to ensure that related
- * model has an AtlasEntity that can be referenced when storing
relationship information.
+ * model has an AtlasEntity that can be referenced when storing
relationship information.
*
* @return true if the entity found
*/
@@ -112,12 +159,9 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
return cachedEntity().isPresent();
}
- public abstract String atlasTypeName();
-
- public abstract UUID id();
-
/**
* Invoked when the entity needs to be updated with values from the model
+ *
* @param entity the entity to write the values into
*/
protected void updateAtlasEntity(AtlasEntity entity) {
@@ -126,6 +170,7 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
/**
* Invoked when the model needs to be updated with values from the entity
+ *
* @param entity the entity to read the values from
*/
protected void updateJavaModel(AtlasEntity entity) {
@@ -134,6 +179,7 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
/**
* Loads the entity for this model from Atlas and stores it in the entity
cache
+ *
* @param client Atlas client to use
* @return loaded entity
*/
@@ -161,6 +207,7 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
/**
* Puts an entity into the entity cache
+ *
* @param atlasEntity the entity to cache
* @return the same entity
*/
@@ -175,53 +222,10 @@ public abstract class CouchbaseAtlasEntity<E extends
CouchbaseAtlasEntity<?>> {
/**
* Looks up the entity in the cache
+ *
* @return Optional of cached entity
*/
private Optional<AtlasEntity> cachedEntity() {
return
Optional.ofNullable(ENTITY_BY_TYPE_AND_ID.getOrDefault(getClass(), (Map<String,
AtlasEntity>) Collections.EMPTY_MAP).getOrDefault(id().toString(), null));
}
-
- /**
- * First checks if the entity has been loaded and cached and, if not, then
tries to load it from Atlas
- * @param atlas Atlas client to use
- * @return true if the entity found either in cache or in Atlas
- */
- public boolean exists(AtlasClientV2 atlas) {
- if (!exists()) {
- return load(atlas).isPresent();
- }
- return true;
- }
-
- /**
- * Returns pre-cached model with provided identifiers or caches this model
and returns it
- *
- * @return the model
- */
- public E get() {
- Class<E> type = (Class<E>) getClass();
- String id = id().toString();
-
- // ensure valid cache structure
- if (!MODEL_BY_TYPE_AND_ID.containsKey(type)) {
- MODEL_BY_TYPE_AND_ID.put(type, Collections.synchronizedMap(new
HashMap<>()));
- }
-
- // put the model into the cache, if not already present
- Map<String, CouchbaseAtlasEntity> modelsById =
MODEL_BY_TYPE_AND_ID.get(type);
- if (!modelsById.containsKey(id)) {
- try {
- modelsById.put(id, this);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- return (E) modelsById.get(id);
- }
-
- public static void dropCache() {
- ENTITY_BY_TYPE_AND_ID.clear();
- MODEL_BY_TYPE_AND_ID.clear();
- }
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
index 2983b2af2..9e7db36fd 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
@@ -18,27 +18,22 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
-import org.apache.atlas.model.typedef.AtlasStructDef;
-import org.apache.atlas.type.AtlasTypeUtil;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
public class CouchbaseBucket extends CouchbaseAtlasEntity<CouchbaseBucket> {
- public static final String TYPE_NAME = "couchbase_bucket";
- private CouchbaseCluster cluster;
- private transient Map<String, CouchbaseScope> scopes =
Collections.synchronizedMap(new HashMap<>());
+ public static final String TYPE_NAME =
"couchbase_bucket";
+
+ private final transient Map<String, CouchbaseScope> scopes =
Collections.synchronizedMap(new HashMap<>());
+
+ private CouchbaseCluster cluster;
+
+ public CouchbaseBucket() {
+ }
@Override
public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
@@ -52,8 +47,14 @@ public class CouchbaseBucket extends
CouchbaseAtlasEntity<CouchbaseBucket> {
return String.format("%s/%s", cluster.qualifiedName(), name());
}
- public CouchbaseBucket() {
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), cluster().id(), name()).getBytes(Charset.defaultCharset()));
}
public CouchbaseCluster cluster() {
@@ -65,23 +66,9 @@ public class CouchbaseBucket extends
CouchbaseAtlasEntity<CouchbaseBucket> {
return this;
}
- @Override
- public String atlasTypeName() {
- return TYPE_NAME;
- }
-
- @Override
- public UUID id() {
- return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), cluster().id(), name()).getBytes(Charset.defaultCharset()));
- }
-
public CouchbaseScope scope(String name) {
if (!scopes.containsKey(name)) {
- scopes.put(name, new CouchbaseScope()
- .bucket(this)
- .name(name)
- .get()
- );
+ scopes.put(name, new
CouchbaseScope().bucket(this).name(name).get());
}
return scopes.get(name);
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
index 2f4efe865..ef0c399d0 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
@@ -30,27 +30,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
public class CouchbaseCluster extends CouchbaseAtlasEntity<CouchbaseCluster> {
public static final String TYPE_NAME = "couchbase_cluster";
- private String url;
-
- public String url() {
- return url;
- }
-
- public CouchbaseCluster url(String url) {
- this.url = url;
- return this;
- }
+ private String url;
public static AtlasEntityDef atlasEntityDef() {
AtlasEntityDef definition = AtlasTypeUtil.createClassTypeDef(
"couchbase_cluster",
- new HashSet<>()
- );
+ new HashSet<>());
definition.getSuperTypes().add("Asset");
definition.setServiceType("couchbase");
@@ -68,8 +57,7 @@ public class CouchbaseCluster extends
CouchbaseAtlasEntity<CouchbaseCluster> {
true,
true,
true,
- Collections.EMPTY_LIST
- ));
+ Collections.EMPTY_LIST));
return definition;
}
@@ -87,26 +75,21 @@ public class CouchbaseCluster extends
CouchbaseAtlasEntity<CouchbaseCluster> {
"couchbase_cluster",
"buckets",
AtlasStructDef.AtlasAttributeDef.Cardinality.SET,
- true
- ),
+ true),
new AtlasRelationshipEndDef(
"couchbase_bucket",
"cluster",
AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE,
- false
- )
- )
- );
+ false)));
}
- @Override
- public String atlasTypeName() {
- return TYPE_NAME;
+ public String url() {
+ return url;
}
- @Override
- public UUID id() {
- return UUID.nameUUIDFromBytes(String.format("%s:%s", atlasTypeName(),
url()).getBytes(Charset.defaultCharset()));
+ public CouchbaseCluster url(String url) {
+ this.url = url;
+ return this;
}
@Override
@@ -121,10 +104,20 @@ public class CouchbaseCluster extends
CouchbaseAtlasEntity<CouchbaseCluster> {
return url();
}
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s", atlasTypeName(),
url()).getBytes(Charset.defaultCharset()));
+ }
+
@Override
protected void updateJavaModel(AtlasEntity entity) {
if (entity.hasAttribute("url")) {
this.url = (String) entity.getAttribute("url");
}
}
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
index b7319581d..174891100 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
@@ -18,22 +18,11 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
-import org.apache.atlas.model.typedef.AtlasStructDef;
-import org.apache.atlas.type.AtlasTypeUtil;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.UUID;
public class CouchbaseCollection extends
CouchbaseAtlasEntity<CouchbaseCollection> {
-
private CouchbaseScope scope;
private long documentsAnalyzed;
@@ -50,6 +39,21 @@ public class CouchbaseCollection extends
CouchbaseAtlasEntity<CouchbaseCollectio
return entity;
}
+ @Override
+ protected String qualifiedName() {
+ return String.format("%s/%s", scope.qualifiedName(), name());
+ }
+
+ @Override
+ public String atlasTypeName() {
+ return "couchbase_collection";
+ }
+
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), scope().id().toString(),
name()).getBytes(Charset.defaultCharset()));
+ }
+
@Override
protected void updateAtlasEntity(AtlasEntity entity) {
entity.setAttribute("documentsAnalyzed", documentsAnalyzed);
@@ -69,21 +73,6 @@ public class CouchbaseCollection extends
CouchbaseAtlasEntity<CouchbaseCollectio
return this;
}
- @Override
- protected String qualifiedName() {
- return String.format("%s/%s", scope.qualifiedName(), name());
- }
-
- @Override
- public String atlasTypeName() {
- return "couchbase_collection";
- }
-
- @Override
- public UUID id() {
- return UUID.nameUUIDFromBytes(String.format("%s:%s:%s",
atlasTypeName(), scope().id().toString(),
name()).getBytes(Charset.defaultCharset()));
- }
-
public CouchbaseScope scope() {
return this.scope;
}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
index 5d14f6e20..de4f436c7 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
@@ -18,32 +18,21 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
-import org.apache.atlas.model.typedef.AtlasStructDef;
-import org.apache.atlas.type.AtlasTypeUtil;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.UUID;
public class CouchbaseField extends CouchbaseAtlasEntity<CouchbaseField> {
- public static final String TYPE_NAME = "couchbase_field";
- private CouchbaseFieldType fieldType;
- private String fieldPath;
- private long documentCount = 0;
+ public static final String TYPE_NAME = "couchbase_field";
+ private CouchbaseFieldType fieldType;
+ private String fieldPath;
+ private long documentCount;
private CouchbaseField parentField;
private CouchbaseCollection collection;
public CouchbaseField() {
-
}
public CouchbaseFieldType fieldType() {
@@ -96,13 +85,6 @@ public class CouchbaseField extends
CouchbaseAtlasEntity<CouchbaseField> {
return entity;
}
- @Override
- protected void updateAtlasEntity(AtlasEntity entity) {
- entity.setAttribute("fieldType", fieldType.toString());
- entity.setAttribute("fieldPath", fieldPath);
- entity.setAttribute("documentCount", documentCount);
- }
-
@Override
protected String qualifiedName() {
return String.format("%s/%s:%s", collection.qualifiedName(),
fieldPath(), fieldType());
@@ -118,6 +100,13 @@ public class CouchbaseField extends
CouchbaseAtlasEntity<CouchbaseField> {
return
UUID.nameUUIDFromBytes(qualifiedName().getBytes(Charset.defaultCharset()));
}
+ @Override
+ protected void updateAtlasEntity(AtlasEntity entity) {
+ entity.setAttribute("fieldType", fieldType.toString());
+ entity.setAttribute("fieldPath", fieldPath);
+ entity.setAttribute("documentCount", documentCount);
+ }
+
public CouchbaseField parentField() {
return parentField;
}
@@ -126,4 +115,4 @@ public class CouchbaseField extends
CouchbaseAtlasEntity<CouchbaseField> {
this.parentField = parent;
return this;
}
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
index 8355f60cf..67d1c5a6e 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
@@ -16,14 +16,11 @@
package com.couchbase.atlas.connector.entities;
-import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.java.json.JsonObject;
-import org.apache.atlas.model.typedef.AtlasEnumDef;
-import org.apache.atlas.type.AtlasTypeUtil;
import javax.annotation.Nonnull;
+
import java.util.Collection;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -67,5 +64,4 @@ public enum CouchbaseFieldType {
public String toString() {
return super.toString().toLowerCase(Locale.getDefault());
}
-
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
index 13f4e4852..c4a39bcc0 100644
---
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
+++
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
@@ -18,28 +18,19 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipDef;
-import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
-import org.apache.atlas.model.typedef.AtlasStructDef;
-import org.apache.atlas.type.AtlasTypeUtil;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
public class CouchbaseScope extends CouchbaseAtlasEntity<CouchbaseScope> {
+ public static final String TYPE_NAME = "couchbase_scope";
- public static final String TYPE_NAME = "couchbase_scope";
- private CouchbaseBucket bucket;
+ private final transient Map<String, CouchbaseCollection> collections =
Collections.synchronizedMap(new HashMap<>());
- private transient Map<String, CouchbaseCollection> collections =
Collections.synchronizedMap(new HashMap<>());
+ private CouchbaseBucket bucket;
public CouchbaseBucket bucket() {
return bucket;
@@ -50,18 +41,6 @@ public class CouchbaseScope extends
CouchbaseAtlasEntity<CouchbaseScope> {
return this;
}
- @Override
- public UUID id() {
- return UUID.nameUUIDFromBytes(
- String.format(
- "%s:%s:%s",
- atlasTypeName(),
- bucket().id().toString(),
- name()
- ).getBytes(Charset.defaultCharset())
- );
- }
-
@Override
public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
AtlasEntity entity = super.atlasEntity(atlas);
@@ -79,13 +58,19 @@ public class CouchbaseScope extends
CouchbaseAtlasEntity<CouchbaseScope> {
return TYPE_NAME;
}
+ @Override
+ public UUID id() {
+ return UUID.nameUUIDFromBytes(
+ String.format("%s:%s:%s",
+ atlasTypeName(),
+ bucket().id().toString(),
+ name()
+ ).getBytes(Charset.defaultCharset()));
+ }
+
public CouchbaseCollection collection(String name) {
if (!collections.containsKey(name)) {
- collections.put(name, new CouchbaseCollection()
- .name(name)
- .scope(this)
- .get()
- );
+ collections.put(name, new
CouchbaseCollection().name(name).scope(this).get());
}
return collections.get(name);
diff --git
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
index 36d81a3ac..2fd49877e 100644
---
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
+++
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
@@ -26,7 +26,6 @@ import com.couchbase.client.dcp.StreamTo;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
@@ -35,44 +34,10 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-public class CouchbaseHookTest {
-
- private Client mockDcpClient() {
- Client mockDcpClient = Mockito.mock(Client.class);
- Mockito.when(mockDcpClient.connect()).thenReturn(Mono.empty());
- Mockito.when(mockDcpClient.initializeState(StreamFrom.NOW,
StreamTo.INFINITY)).thenReturn(Mono.empty());
- Mockito.when(mockDcpClient.startStreaming()).thenReturn(Mono.empty());
- Mockito.when(mockDcpClient.disconnect()).thenReturn(Mono.empty());
- return mockDcpClient;
- }
-
- private AtlasClientV2 mockAtlasClient(boolean returnEntities) throws
Exception {
- AtlasClientV2 mockAtlasClient = Mockito.mock(AtlasClientV2.class);
- final String clusterName = "couchbase://localhost";
- final String bucketName = String.format("%s/%s", clusterName,
"default");
- final String scopeName = String.format("%s/%s", bucketName,
"_default");
-
-
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
- Map<String, String> query = iom.getArgument(1);
- Assert.assertEquals(clusterName, query.get("qualifiedName"));
- return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
- });
-
-
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
- Map<String, String> query = iom.getArgument(1);
- Assert.assertEquals(bucketName, query.get("qualifiedName"));
- return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
- });
-
-
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
- Map<String, String> query = iom.getArgument(1);
- Assert.assertEquals(scopeName, query.get("qualifiedName"));
- return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
- });
-
- return mockAtlasClient;
- }
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+public class CouchbaseHookTest {
@Test
public void testMain() throws Exception {
Client mockDcpClient = mockDcpClient();
@@ -83,11 +48,9 @@ public class CouchbaseHookTest {
AtomicInteger createCalled = new AtomicInteger();
Consumer<List<AtlasEntity>> createEntitiesInterceptor = ents -> {
createCalled.getAndIncrement();
- Assert.assertEquals(ents.size(), 2);
- };
- Consumer<List<AtlasEntity>> updateEntitiesInterceptor = ents -> {
- Assert.assertTrue(false);
+ assertEquals(ents.size(), 2);
};
+ Consumer<List<AtlasEntity>> updateEntitiesInterceptor = ents -> fail();
CouchbaseHook.setEntityInterceptors(createEntitiesInterceptor,
updateEntitiesInterceptor);
CouchbaseHook.loop(false);
@@ -95,7 +58,7 @@ public class CouchbaseHookTest {
CouchbaseHook.main(new String[0]);
Mockito.verify(mockDcpClient, Mockito.times(1)).connect();
- Assert.assertEquals(1, createCalled.get());
+ assertEquals(createCalled.get(), 1);
// 2 times: 1 time when we call exists(ATLAS) and second time when we
request the entity
validateAtlasInvocations(mockAtlasClient, 3, 2, 0);
@@ -108,29 +71,55 @@ public class CouchbaseHookTest {
CouchbaseHook.main(new String[0]);
Mockito.verify(mockDcpClient, Mockito.times(2)).connect();
- Assert.assertEquals(1, createCalled.get());
+ assertEquals(createCalled.get(), 1);
// 1 time and then it should be cached
validateAtlasInvocations(mockAtlasClient, 1, 1, 0);
- testEvents(CouchbaseHook.INSTANCE);
+ testEvents(CouchbaseHook.instance);
}
public void testEvents(CouchbaseHook listener) {
+ }
+
+ private Client mockDcpClient() {
+ Client mockDcpClient = Mockito.mock(Client.class);
+ Mockito.when(mockDcpClient.connect()).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.initializeState(StreamFrom.NOW,
StreamTo.INFINITY)).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.startStreaming()).thenReturn(Mono.empty());
+ Mockito.when(mockDcpClient.disconnect()).thenReturn(Mono.empty());
+ return mockDcpClient;
+ }
+ private AtlasClientV2 mockAtlasClient(boolean returnEntities) throws
Exception {
+ AtlasClientV2 mockAtlasClient = Mockito.mock(AtlasClientV2.class);
+ final String clusterName = "couchbase://localhost";
+ final String bucketName = String.format("%s/%s", clusterName,
"default");
+ final String scopeName = String.format("%s/%s", bucketName,
"_default");
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ assertEquals(query.get("qualifiedName"), clusterName);
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ assertEquals(bucketName, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
+ Map<String, String> query = iom.getArgument(1);
+ assertEquals(scopeName, query.get("qualifiedName"));
+ return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ?
Mockito.mock(AtlasEntity.class) : null);
+ });
+
+ return mockAtlasClient;
}
private void validateAtlasInvocations(AtlasClientV2 mockAtlasClient, int
cluster, int bucket, int scope) throws Exception {
- Mockito.verify(mockAtlasClient,
Mockito.times(cluster)).getEntityByAttribute(
- Mockito.eq(CouchbaseCluster.TYPE_NAME),
- Mockito.anyMap()
- );
- Mockito.verify(mockAtlasClient,
Mockito.times(bucket)).getEntityByAttribute(
- Mockito.eq(CouchbaseBucket.TYPE_NAME),
- Mockito.anyMap()
- );
- Mockito.verify(mockAtlasClient,
Mockito.times(scope)).getEntityByAttribute(
- Mockito.eq(CouchbaseScope.TYPE_NAME),
- Mockito.anyMap()
- );
+ Mockito.verify(mockAtlasClient,
Mockito.times(cluster)).getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME),
Mockito.anyMap());
+ Mockito.verify(mockAtlasClient,
Mockito.times(bucket)).getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME),
Mockito.anyMap());
+ Mockito.verify(mockAtlasClient,
Mockito.times(scope)).getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME),
Mockito.anyMap());
}
-}
\ No newline at end of file
+}
diff --git
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
index 5449d6778..6f5e1c753 100644
---
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
+++
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
@@ -19,78 +19,57 @@ package com.couchbase.atlas.connector.entities;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.Test;
-import java.nio.charset.Charset;
import java.util.Map;
import java.util.UUID;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
/**
* Tests atlas entity loading and caching
*/
public class CouchbaseAtlasEntityTest {
- final static String QUALIFIED_NAME = "testEntityQualifiedName";
- final static String TYPE_NAME = "testEntityTypeName";
- final static UUID ID = UUID.randomUUID();
-
- public class TestEntity extends CouchbaseAtlasEntity<TestEntity> {
-
- @Override
- protected String qualifiedName() {
- return QUALIFIED_NAME;
- }
-
- @Override
- public String atlasTypeName() {
- return TYPE_NAME;
- }
-
- @Override
- public UUID id() {
- return ID;
- }
- }
+ static final String QUALIFIED_NAME = "testEntityQualifiedName";
+ static final String TYPE_NAME = "testEntityTypeName";
+ static final UUID ID = UUID.randomUUID();
@Test
public void testEntityLoading() throws Exception {
final AtlasClientV2 ac = Mockito.mock(AtlasClientV2.class);
- final AtlasEntity ae = Mockito.mock(AtlasEntity.class);
+ final AtlasEntity ae = Mockito.mock(AtlasEntity.class);
- Mockito.when(ae.getAttribute(Mockito.eq("qualifiedName")))
- .thenReturn(QUALIFIED_NAME);
+
Mockito.when(ae.getAttribute(Mockito.eq("qualifiedName"))).thenReturn(QUALIFIED_NAME);
- Mockito.when(
- ac.getEntityByAttribute(
- Mockito.eq(TYPE_NAME),
- Mockito.anyMap()
- )
- ).thenAnswer(iom -> {
+ Mockito.when(ac.getEntityByAttribute(Mockito.eq(TYPE_NAME),
Mockito.anyMap())).thenAnswer(iom -> {
Map<String, String> query = iom.getArgument(1);
- Assert.assertTrue(query.containsKey("qualifiedName"));
- Assert.assertEquals(QUALIFIED_NAME, query.get("qualifiedName"));
+ assertTrue(query.containsKey("qualifiedName"));
+ assertEquals(query.get("qualifiedName"), QUALIFIED_NAME);
return new AtlasEntity.AtlasEntityWithExtInfo(ae);
});
TestEntity subject = Mockito.spy(new TestEntity());
// exists must return false at this point as we've just created the
model but it doesn't have the corresponding AtlasEntity
// and the cache should be empty
- Assert.assertFalse(subject.exists());
- Assert.assertSame(subject, subject.get());
- Assert.assertFalse(subject.exists());
+ assertFalse(subject.exists());
+ assertSame(subject, subject.get());
+ assertFalse(subject.exists());
// ditto
- Assert.assertTrue(!subject.atlasEntity().isPresent());
+ assertFalse(subject.atlasEntity().isPresent());
// Because our client mock should return the mock entity, exists with
Atlas check should find the entity,
// cache it, and return true
- Assert.assertTrue(subject.exists(ac));
+ assertTrue(subject.exists(ac));
// and call the method to update our model
Mockito.verify(subject,
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
// Let's validate that exists with Atlas check did, in fact, query our
atlas mock for the entity
Mockito.verify(ac,
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
// the entity should exist in cache
- Assert.assertTrue(subject.exists());
+ assertTrue(subject.exists());
// and exists with Atlas check should use it
- Assert.assertTrue(subject.exists(ac));
+ assertTrue(subject.exists(ac));
// so, let's verify that the item was pulled not from atlas (from
cache will be the only option left)
Mockito.verify(ac,
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
@@ -98,11 +77,28 @@ public class CouchbaseAtlasEntityTest {
// And, no matter how many times we call, the result should be the
same (but let's make sure that we call it at least twice)
int timesToLoadEntity = 2 + (int) (Math.random() * 98);
for (int i = 0; i < timesToLoadEntity; i++) {
- Assert.assertSame(ae, subject.atlasEntity().get());
+ assertSame(ae, subject.atlasEntity().get());
}
// verify that atlas entity was updated every time we requested it
Mockito.verify(subject,
Mockito.times(timesToLoadEntity)).updateAtlasEntity(Mockito.eq(ae));
// verify that the model was not updated when we requested the entity
second time
Mockito.verify(subject,
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
}
-}
\ No newline at end of file
+
+ public class TestEntity extends CouchbaseAtlasEntity<TestEntity> {
+ @Override
+ public String atlasTypeName() {
+ return TYPE_NAME;
+ }
+
+ @Override
+ public UUID id() {
+ return ID;
+ }
+
+ @Override
+ protected String qualifiedName() {
+ return QUALIFIED_NAME;
+ }
+ }
+}