This is an automated email from the ASF dual-hosted git repository.
mandarambawane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new be7b20872 ATLAS-4975 : kafka-bridge module: update for code
readability improvements (#380)
be7b20872 is described below
commit be7b20872eb1b1b81eb3cfbf7ddd9dbae1ae1df7
Author: mandarambawane <[email protected]>
AuthorDate: Wed Jul 16 20:15:32 2025 +0530
ATLAS-4975 : kafka-bridge module: update for code readability improvements
(#380)
---
addons/kafka-bridge/pom.xml | 5 +
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 156 ++++++++++-----------
.../kafka/bridge/SchemaRegistryConnector.java | 31 ++--
.../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 11 +-
4 files changed, 94 insertions(+), 109 deletions(-)
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index a1dd3a666..1f0f54eab 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -32,6 +32,11 @@
<name>Apache Atlas Kafka Bridge</name>
<description>Apache Atlas Kafka Bridge Module</description>
+ <properties>
+ <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+ <checkstyle.skip>false</checkstyle.skip>
+ </properties>
+
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
diff --git
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 0878ec29f..8223ce56c 100644
---
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -29,6 +29,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.KafkaUtils;
+import org.apache.avro.Schema;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -38,7 +39,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.json.simple.JSONObject;
@@ -49,43 +49,41 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
-import org.apache.avro.Schema;
-import java.io.IOException;
public class KafkaBridge {
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaBridge.class);
- private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE=
System.getenv("KAFKA_SCHEMA_REGISTRY");
- public static String KAFKA_SCHEMA_REGISTRY_HOSTNAME =
"localhost";
- private static final int EXIT_CODE_SUCCESS = 0;
- private static final int EXIT_CODE_FAILED = 1;
- private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
- private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
- private static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
- private static final String KAFKA_METADATA_NAMESPACE =
"atlas.metadata.namespace";
- private static final String DEFAULT_CLUSTER_NAME = "primary";
- private static final String ATTRIBUTE_QUALIFIED_NAME =
"qualifiedName";
- private static final String DESCRIPTION_ATTR =
"description";
- private static final String PARTITION_COUNT =
"partitionCount";
- private static final String REPLICATION_FACTOR =
"replicationFactor";
- private static final String NAME = "name";
- private static final String URI = "uri";
- private static final String CLUSTERNAME =
"clusterName";
- private static final String TOPIC = "topic";
- private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
- private static final String TYPE = "type";
- private static final String NAMESPACE =
"namespace";
- private static final String FIELDS = "fields";
- private static final String AVRO_SCHEMA =
"avroSchema";
- private static final String SCHEMA_VERSION_ID =
"versionId";
-
- private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME =
"%s@%s@%s";
- private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME =
"%s@%s@%s@%s";
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaBridge.class);
+ private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE =
System.getenv("KAFKA_SCHEMA_REGISTRY");
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
+ private static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ private static final String KAFKA_METADATA_NAMESPACE =
"atlas.metadata.namespace";
+ private static final String DEFAULT_CLUSTER_NAME =
"primary";
+ private static final String ATTRIBUTE_QUALIFIED_NAME =
"qualifiedName";
+ private static final String DESCRIPTION_ATTR =
"description";
+ private static final String PARTITION_COUNT =
"partitionCount";
+ private static final String REPLICATION_FACTOR =
"replicationFactor";
+ private static final String NAME = "name";
+ private static final String URI = "uri";
+ private static final String CLUSTERNAME =
"clusterName";
+ private static final String TOPIC = "topic";
+ private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
+ private static final String TYPE = "type";
+ private static final String NAMESPACE =
"namespace";
+ private static final String FIELDS = "fields";
+ private static final String AVRO_SCHEMA =
"avroSchema";
+ private static final String SCHEMA_VERSION_ID =
"versionId";
+
+ private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME =
"%s@%s@%s";
+ private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME =
"%s@%s@%s@%s";
private final List<String> availableTopics;
private final String metadataNamespace;
@@ -93,6 +91,8 @@ public class KafkaBridge {
private final KafkaUtils kafkaUtils;
private final CloseableHttpClient httpClient;
+ public static String kafkaSchemaRegistryHostname =
"localhost";
+
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
@@ -105,7 +105,7 @@ public class KafkaBridge {
try {
Options options = new Options();
- options.addOption("t","topic", true, "topic");
+ options.addOption("t", "topic", true, "topic");
options.addOption("f", "filename", true, "filename");
CommandLineParser parser = new BasicParser();
@@ -116,10 +116,9 @@ public class KafkaBridge {
String[] urls =
atlasConf.getStringArray(ATLAS_ENDPOINT);
if (urls == null || urls.length == 0) {
- urls = new String[] { DEFAULT_ATLAS_URL };
+ urls = new String[] {DEFAULT_ATLAS_URL};
}
-
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String[] basicAuthUsernamePassword =
AuthenticationUtil.getBasicAuthenticationInput();
@@ -134,8 +133,8 @@ public class KafkaBridge {
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2,
kafkaUtils);
- if(StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) {
- KAFKA_SCHEMA_REGISTRY_HOSTNAME =
KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
+ if (StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) {
+ kafkaSchemaRegistryHostname =
KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
}
if (StringUtils.isNotEmpty(fileToImport)) {
@@ -160,11 +159,11 @@ public class KafkaBridge {
exitCode = EXIT_CODE_SUCCESS;
}
- } catch(ParseException e) {
+ } catch (ParseException e) {
LOG.error("Failed to parse arguments. Error: ", e.getMessage());
printUsage();
- } catch(Exception e) {
+ } catch (Exception e) {
System.out.println("ImportKafkaEntities failed. Please check the
log file for the detailed error message");
e.printStackTrace();
@@ -213,15 +212,15 @@ public class KafkaBridge {
List<String> topics = availableTopics;
if (StringUtils.isNotEmpty(topicToImport)) {
- List<String> topics_subset = new ArrayList<>();
+ List<String> topicsSubset = new ArrayList<>();
for (String topic : topics) {
if (Pattern.compile(topicToImport).matcher(topic).matches()) {
- topics_subset.add(topic);
+ topicsSubset.add(topic);
}
}
- topics = topics_subset;
+ topics = topicsSubset;
}
if (CollectionUtils.isNotEmpty(topics)) {
@@ -234,7 +233,7 @@ public class KafkaBridge {
@VisibleForTesting
AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
String topicQualifiedName =
getTopicQualifiedName(metadataNamespace, topic);
- AtlasEntityWithExtInfo topicEntity =
findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(),topicQualifiedName);
+ AtlasEntityWithExtInfo topicEntity =
findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
System.out.print("\n"); // add a new line for each topic
@@ -295,7 +294,7 @@ public class KafkaBridge {
System.out.println("---Adding Avro field " + fullname);
LOG.info("Importing Avro field: {}", fieldQualifiedName);
- AtlasEntity entity = getFieldEntity(field, schemaName, namespace,
version ,null, fullname);
+ AtlasEntity entity = getFieldEntity(field, schemaName, namespace,
version, null, fullname);
fieldEntity = createEntityInAtlas(new
AtlasEntityWithExtInfo(entity));
} else {
@@ -312,7 +311,6 @@ public class KafkaBridge {
return fieldEntity;
}
-
@VisibleForTesting
AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws
Exception {
final AtlasEntity ret;
@@ -329,7 +327,7 @@ public class KafkaBridge {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(TOPIC, topic);
- ret.setAttribute(NAME,topic);
+ ret.setAttribute(NAME, topic);
ret.setAttribute(DESCRIPTION_ATTR, topic);
ret.setAttribute(URI, topic);
@@ -344,7 +342,7 @@ public class KafkaBridge {
createdSchemas = findOrCreateAtlasSchema(topic);
- if(createdSchemas.size() > 0) {
+ if (createdSchemas.size() > 0) {
ret.setAttribute(AVRO_SCHEMA, createdSchemas);
ret.setRelationshipAttribute(AVRO_SCHEMA, createdSchemas);
}
@@ -357,7 +355,6 @@ public class KafkaBridge {
final AtlasEntity ret;
List<AtlasEntity> createdFields = new ArrayList<>();
-
if (schemaEntity == null) {
ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
} else {
@@ -375,12 +372,12 @@ public class KafkaBridge {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(TYPE, parsedSchema.getType());
ret.setAttribute(NAMESPACE, namespace);
- ret.setAttribute(NAME,parsedSchema.getName() + "(v" + version + ")");
+ ret.setAttribute(NAME, parsedSchema.getName() + "(v" + version + ")");
ret.setAttribute(SCHEMA_VERSION_ID, version);
createdFields = createNestedFields(parsedSchema, schemaName,
namespace, version, "");
- if(createdFields.size() > 0) {
+ if (createdFields.size() > 0) {
ret.setRelationshipAttribute(FIELDS, createdFields);
}
@@ -392,9 +389,8 @@ public class KafkaBridge {
AtlasEntityWithExtInfo fieldInAtlas;
JSONParser parser = new JSONParser();
- for (Schema.Field field:parsedSchema.getFields()) {
-
- if(field.schema().getType() == Schema.Type.ARRAY){
+ for (Schema.Field field : parsedSchema.getFields()) {
+ if (field.schema().getType() == Schema.Type.ARRAY) {
System.out.println("ARRAY DETECTED");
String subfields = ((JSONObject)
parser.parse(field.schema().toString())).get("items").toString();
Schema parsedSubSchema = new Schema.Parser().parse(subfields);
@@ -402,15 +398,11 @@ public class KafkaBridge {
fullname = concatFullname(field.name(), fullname,
parsedSubSchema.getName());
entityArray.addAll(createNestedFields(parsedSubSchema,
schemaName, namespace, version, fullname));
- }
-
- else if(field.schema().getType() == Schema.Type.RECORD &&
!schemaName.equals(field.name())) {
- System.out.println("NESTED RECORD DETECTED");
- fullname = concatFullname(field.name(), fullname, "");
- entityArray.addAll(createNestedFields(field.schema(),
schemaName, namespace, version, fullname));
- }
-
- else{
+ } else if (field.schema().getType() == Schema.Type.RECORD &&
!schemaName.equals(field.name())) {
+ System.out.println("NESTED RECORD DETECTED");
+ fullname = concatFullname(field.name(), fullname, "");
+ entityArray.addAll(createNestedFields(field.schema(),
schemaName, namespace, version, fullname));
+ } else {
fieldInAtlas = createOrUpdateField(field, schemaName,
namespace, version, fullname);
entityArray.add(fieldInAtlas.getEntity());
@@ -443,7 +435,7 @@ public class KafkaBridge {
String qualifiedName = getFieldQualifiedName(metadataNamespace,
fullname, schemaName + "-value", "v" + version);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
- ret.setAttribute(NAME,fullname + "(v" + version + ")");
+ ret.setAttribute(NAME, fullname + "(v" + version + ")");
//ret.setAttribute(field.schema().getType()); --> does not work, since
type expects array<avro_type>. Instead setting Description
ret.setAttribute(DESCRIPTION_ATTR, field.schema().getType());
return ret;
@@ -461,7 +453,7 @@ public class KafkaBridge {
@VisibleForTesting
static String getFieldQualifiedName(String metadataNamespace, String
field, String schemaName, String version) {
- return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME ,
field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
+ return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME,
field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
}
@VisibleForTesting
@@ -470,8 +462,7 @@ public class KafkaBridge {
try {
ret = atlasClientV2.getEntityByAttribute(typeName,
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
- }
- catch (Exception e){
+ } catch (Exception e) {
LOG.info("Exception on finding Atlas Entity: {}", e);
}
@@ -509,12 +500,12 @@ public class KafkaBridge {
LOG.info("Updated {} entity: name={}, guid={} ",
ret.getEntity().getTypeName(),
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
ret.getEntity().getGuid());
} else {
- LOG.info("Entity: name={} ", entity.toString() + " not updated
as it is unchanged from what is in Atlas" );
+ LOG.info("Entity: name={} ", entity.toString() + " not updated
as it is unchanged from what is in Atlas");
ret = entity;
}
} else {
- LOG.info("Entity: name={} ", entity.toString() + " not updated as
it is unchanged from what is in Atlas" );
+ LOG.info("Entity: name={} ", entity.toString() + " not updated as
it is unchanged from what is in Atlas");
ret = entity;
}
@@ -522,17 +513,16 @@ public class KafkaBridge {
return ret;
}
- private static void printUsage(){
+ private static void printUsage() {
System.out.println("Usage 1: import-kafka.sh");
System.out.println("Usage 2: import-kafka.sh [-t <topic regex> OR
--topic <topic regex>]");
- System.out.println("Usage 3: import-kafka.sh [-f <filename>]" );
+ System.out.println("Usage 3: import-kafka.sh [-f <filename>]");
System.out.println(" Format:");
System.out.println(" topic1 OR topic1 regex");
System.out.println(" topic2 OR topic2 regex");
System.out.println(" topic3 OR topic3 regex");
}
-
private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
if (entity != null) {
clearRelationshipAttributes(entity.getEntity());
@@ -560,19 +550,19 @@ public class KafkaBridge {
private List<AtlasEntity> findOrCreateAtlasSchema(String schemaName)
throws Exception {
List<AtlasEntity> entities = new ArrayList<>();
// Handling Schemas
- ArrayList<Integer> versions =
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient,schemaName +
"-value");
+ ArrayList<Integer> versions =
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient, schemaName +
"-value");
- for (int version:versions) {
+ for (int version : versions) {
String kafkaSchema =
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(httpClient, schemaName
+ "-value", version);
- if(kafkaSchema != null) {
+ if (kafkaSchema != null) {
// Schema exists in Kafka Schema Registry
System.out.println("---Found Schema " + schemaName + "-value
in Kafka Schema Registry with Version " + version);
LOG.info("Found Schema {}-value in Kafka Schema Registry with
Version {}", schemaName, version);
AtlasEntityWithExtInfo atlasSchemaEntity =
findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(),
getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" +
version));
- if(atlasSchemaEntity != null) {
+ if (atlasSchemaEntity != null) {
// Schema exists in Kafka Schema Registry AND in Atlas
System.out.println("---Found Entity avro_schema " +
schemaName + " in Atlas");
@@ -597,25 +587,21 @@ public class KafkaBridge {
return entities;
}
- private String concatFullname(String fieldName,String fullname, String
subSchemaName){
- if(fullname.isEmpty()){
- if(subSchemaName.isEmpty()) {
+ private String concatFullname(String fieldName, String fullname, String
subSchemaName) {
+ if (fullname.isEmpty()) {
+ if (subSchemaName.isEmpty()) {
fullname = fieldName;
- }
- else {
+ } else {
fullname = fieldName + "." + subSchemaName;
}
-
- }
- else{
- if(subSchemaName.isEmpty()) {
+ } else {
+ if (subSchemaName.isEmpty()) {
fullname = fullname + "." + fieldName;
- }
- else {
+ } else {
fullname = fullname + "." + subSchemaName + "." + fieldName;
}
}
return fullname;
}
-}
\ No newline at end of file
+}
diff --git
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
index d5d85d6b4..958d2a2aa 100644
---
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
+++
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
@@ -36,14 +36,18 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
public class SchemaRegistryConnector {
- private static final String SCHEMA_KEY = "schema";
- private static final Logger LOG =
LoggerFactory.getLogger(SchemaRegistryConnector.class);
+ private static final String SCHEMA_KEY = "schema";
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaRegistryConnector.class);
+
+ private SchemaRegistryConnector() {
+ //private constructor
+ }
static ArrayList<Integer>
getVersionsKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject)
throws IOException {
ArrayList<Integer> list = new ArrayList<>();
JSONParser parser = new JSONParser();
- HttpGet getRequest = new HttpGet("http://" +
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject +
"/versions/");
+ HttpGet getRequest = new HttpGet("http://" +
KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject +
"/versions/");
getRequest.addHeader("accept", "application/json");
getRequest.addHeader("Content-Type",
"application/vnd.schemaregistry.v1+json");
@@ -72,7 +76,6 @@ public class SchemaRegistryConnector {
System.out.println("---Error reading versions to schema: "
+ subject + " in Kafka");
LOG.error("Error reading versions to schema: " + subject +
" in Kafka: ", e.getMessage());
}
-
} else if (response.getStatusLine().getStatusCode() ==
HttpStatus.SC_NOT_FOUND) {
// did not find any schema to the topic
System.out.println("---No schema versions found for schema: "
+ subject + " in Schema Registry");
@@ -85,8 +88,7 @@ public class SchemaRegistryConnector {
EntityUtils.consumeQuietly(response.getEntity());
response.close();
- }
- catch(Exception e) {
+ } catch (Exception e) {
System.out.println("---Error getting versions to schema: " +
subject + " from Kafka");
LOG.error("Error getting versions to schema: " + subject + " from
Kafka: ", e);
}
@@ -94,14 +96,14 @@ public class SchemaRegistryConnector {
}
static String getSchemaFromKafkaSchemaRegistry(CloseableHttpClient
httpClient, String subject, int version) throws IOException {
- HttpGet getRequest = new HttpGet("http://" +
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject +
"/versions/" + version);
+ HttpGet getRequest = new HttpGet("http://" +
KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject + "/versions/"
+ version);
getRequest.addHeader("accept", "application/json");
getRequest.addHeader("Content-Type",
"application/vnd.schemaregistry.v1+json");
JSONParser parser = new JSONParser();
CloseableHttpResponse response = httpClient.execute(getRequest);
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
//found corresponding Schema in Registry
try {
BufferedReader br = new BufferedReader(
@@ -116,19 +118,14 @@ public class SchemaRegistryConnector {
System.out.println("---Error reading versions to schema: " +
subject + " in Kafka");
LOG.error("Error reading versions to schema: " + subject + "
in Kafka: ", e);
}
-
- }
-
- else if (response.getStatusLine().getStatusCode() == 404) {
+ } else if (response.getStatusLine().getStatusCode() == 404) {
// did not find any schema to the topic
System.out.println("---Cannot find the corresponding schema to: "
+ subject + "in Kafka");
LOG.info("Cannot find the corresponding schema to: {} in Kafka",
subject);
- }
-
- else {
+ } else {
// any other error when connecting to schema registry
- System.out.println("---Cannot connect to schema registry at: " +
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
- LOG.warn("Cannot connect to schema registry at: {}",
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+ System.out.println("---Cannot connect to schema registry at: " +
KafkaBridge.kafkaSchemaRegistryHostname);
+ LOG.warn("Cannot connect to schema registry at: {}",
KafkaBridge.kafkaSchemaRegistryHostname);
}
EntityUtils.consumeQuietly(response.getEntity());
diff --git
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index adbaddd18..ece25edb7 100644
---
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -28,8 +28,8 @@ import org.apache.atlas.utils.KafkaUtils;
import org.apache.avro.Schema;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.mockito.ArgumentCaptor;
@@ -43,7 +43,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -52,7 +51,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class KafkaBridgeTest {
-
private static final String TEST_TOPIC_NAME = "test_topic";
private static final String CLUSTER_NAME = "primary";
private static final String TOPIC_QUALIFIED_NAME =
KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
@@ -309,7 +307,7 @@ public class KafkaBridgeTest {
when(mockResponse.getEntity())
.thenReturn(mock(HttpEntity.class));
when(mockResponse.getEntity().getContent())
- .thenReturn(new ByteArrayInputStream(new
String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":"+
TEST_SCHEMA +"}").getBytes(StandardCharsets.UTF_8)));
+ .thenReturn(new ByteArrayInputStream(new
String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":" +
TEST_SCHEMA + "}").getBytes(StandardCharsets.UTF_8)));
CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
when(mockHttpClient.execute(any()))
@@ -317,7 +315,7 @@ public class KafkaBridgeTest {
when(mockHttpClient.getConnectionManager())
.thenReturn(mock(ClientConnectionManager.class));
- String ret =
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient,
TEST_SCHEMA_NAME,TEST_SCHEMA_VERSION);
+ String ret =
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient,
TEST_SCHEMA_NAME, TEST_SCHEMA_VERSION);
assertEquals(TEST_SCHEMA, ret);
}
@@ -344,5 +342,4 @@ public class KafkaBridgeTest {
assertEquals(TEST_SCHEMA_VERSION_LIST, ret);
}
-
-}
\ No newline at end of file
+}