This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new c423f297e ATLAS-4971: falcon-bridge, falcon-bridge-shim modules:
update for cod… (#296)
c423f297e is described below
commit c423f297e805c96fc421d5ae53f7a156b857b881
Author: sheetalshah1007 <[email protected]>
AuthorDate: Thu Feb 20 10:03:47 2025 +0530
ATLAS-4971: falcon-bridge, falcon-bridge-shim modules: update for cod…
(#296)
(cherry picked from commit 0901f6d816b7a5e2944a5ab52ec6cf61be5b4a46)
---
addons/falcon-bridge-shim/pom.xml | 4 +
.../apache/atlas/falcon/service/AtlasService.java | 81 ++----
addons/falcon-bridge/pom.xml | 5 +
.../apache/atlas/falcon/bridge/FalconBridge.java | 238 ++++++++--------
.../org/apache/atlas/falcon/event/FalconEvent.java | 32 +--
.../org/apache/atlas/falcon/hook/FalconHook.java | 108 ++++----
.../apache/atlas/falcon/model/FalconDataTypes.java | 7 +-
.../falcon/publisher/FalconEventPublisher.java | 11 +-
.../apache/atlas/falcon/service/AtlasService.java | 77 +++---
.../atlas/falcon/{Util => util}/EventUtil.java | 19 +-
.../org/apache/atlas/falcon/hook/FalconHookIT.java | 307 +++++++++++----------
.../test/resources/atlas-application.properties | 43 +--
.../src/test/resources/atlas-logback.xml | 221 ++++++++-------
.../falcon-bridge/src/test/resources/cluster.xml | 2 +-
.../falcon-bridge/src/test/resources/feed-hdfs.xml | 12 +-
.../src/test/resources/feed-replication.xml | 14 +-
addons/falcon-bridge/src/test/resources/feed.xml | 10 +-
.../falcon-bridge/src/test/resources/process.xml | 14 +-
.../src/test/resources/startup.properties | 1 -
19 files changed, 592 insertions(+), 614 deletions(-)
diff --git a/addons/falcon-bridge-shim/pom.xml
b/addons/falcon-bridge-shim/pom.xml
index 1ca461514..df0456f01 100755
--- a/addons/falcon-bridge-shim/pom.xml
+++ b/addons/falcon-bridge-shim/pom.xml
@@ -32,6 +32,10 @@
<name>Apache Atlas Falcon Bridge Shim</name>
<description>Apache Atlas Falcon Bridge Shim Module</description>
+ <properties>
+ <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+ <checkstyle.skip>false</checkstyle.skip>
+ </properties>
<dependencies>
<!-- Logging -->
<dependency>
diff --git
a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
index 2b756de0e..7b464ab33 100755
---
a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
+++
b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,7 +18,6 @@
package org.apache.atlas.falcon.service;
-
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -34,12 +33,12 @@ import org.slf4j.LoggerFactory;
public class AtlasService implements FalconService,
ConfigurationChangeListener {
private static final Logger LOG =
LoggerFactory.getLogger(AtlasService.class);
- private static final String ATLAS_PLUGIN_TYPE = "falcon";
+ private static final String ATLAS_PLUGIN_TYPE = "falcon";
private static final String ATLAS_FALCON_HOOK_IMPL_CLASSNAME =
"org.apache.atlas.falcon.service.AtlasService";
- private AtlasPluginClassLoader atlasPluginClassLoader = null;
- private FalconService falconServiceImpl = null;
- private ConfigurationChangeListener configChangeListenerImpl = null;
+ private AtlasPluginClassLoader atlasPluginClassLoader;
+ private FalconService falconServiceImpl;
+ private ConfigurationChangeListener configChangeListenerImpl;
public AtlasService() {
this.initialize();
@@ -47,9 +46,7 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
@Override
public String getName() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.getName()");
- }
+ LOG.debug("==> AtlasService.getName()");
String ret = null;
@@ -60,18 +57,14 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.getName()");
- }
+ LOG.debug("<== AtlasService.getName()");
return ret;
}
@Override
public void init() throws FalconException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.init()");
- }
+ LOG.debug("==> AtlasService.init()");
try {
activatePluginClassLoader();
@@ -83,16 +76,12 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.init()");
- }
+ LOG.debug("<== AtlasService.init()");
}
@Override
public void destroy() throws FalconException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.destroy()");
- }
+ LOG.debug("==> AtlasService.destroy()");
try {
activatePluginClassLoader();
@@ -104,16 +93,12 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.destroy()");
- }
+ LOG.debug("<== AtlasService.destroy()");
}
@Override
public void onAdd(Entity entity) throws FalconException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.onAdd({})", entity);
- }
+ LOG.debug("==> AtlasService.onAdd({})", entity);
try {
activatePluginClassLoader();
@@ -122,16 +107,12 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.onAdd({})", entity);
- }
+ LOG.debug("<== AtlasService.onAdd({})", entity);
}
@Override
public void onRemove(Entity entity) throws FalconException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.onRemove({})", entity);
- }
+ LOG.debug("==> AtlasService.onRemove({})", entity);
try {
activatePluginClassLoader();
@@ -140,16 +121,12 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.onRemove({})", entity);
- }
+ LOG.debug("<== AtlasService.onRemove({})", entity);
}
@Override
public void onChange(Entity entity, Entity entity1) throws FalconException
{
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1);
- }
+ LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1);
try {
activatePluginClassLoader();
@@ -158,16 +135,12 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1);
- }
+ LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1);
}
@Override
public void onReload(Entity entity) throws FalconException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.onReload({})", entity);
- }
+ LOG.debug("==> AtlasService.onReload({})", entity);
try {
activatePluginClassLoader();
@@ -176,15 +149,11 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.onReload({})", entity);
- }
+ LOG.debug("<== AtlasService.onReload({})", entity);
}
private void initialize() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasService.initialize()");
- }
+ LOG.debug("==> AtlasService.initialize()");
try {
atlasPluginClassLoader =
AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass());
@@ -195,7 +164,7 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
Object atlasService = cls.newInstance();
- falconServiceImpl = (FalconService) atlasService;
+ falconServiceImpl = (FalconService) atlasService;
configChangeListenerImpl = (ConfigurationChangeListener)
atlasService;
} catch (Exception excp) {
LOG.error("Error instantiating Atlas hook implementation", excp);
@@ -203,9 +172,7 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
deactivatePluginClassLoader();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasService.initialize()");
- }
+ LOG.debug("<== AtlasService.initialize()");
}
private void activatePluginClassLoader() {
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index b29837613..eca232e4d 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -32,6 +32,11 @@
<name>Apache Atlas Falcon Bridge</name>
<description>Apache Atlas Falcon Bridge Module</description>
+ <properties>
+ <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+ <checkstyle.skip>false</checkstyle.skip>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
index cbf002f4f..f21dd17c3 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
@@ -20,8 +20,8 @@ package org.apache.atlas.falcon.bridge;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.falcon.Util.EventUtil;
import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.falcon.util.EventUtil;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.v1.model.instance.Referenceable;
@@ -60,15 +60,18 @@ import java.util.Map;
public class FalconBridge {
private static final Logger LOG =
LoggerFactory.getLogger(FalconBridge.class);
- public static final String COLO = "colo";
- public static final String TAGS = "tags";
- public static final String GROUPS = "groups";
- public static final String PIPELINES = "pipelines";
- public static final String WFPROPERTIES = "workflow-properties";
- public static final String RUNSON = "runs-on";
- public static final String STOREDIN = "stored-in";
- public static final String FREQUENCY = "frequency";
- public static final String ATTRIBUTE_DB = "db";
+ public static final String COLO = "colo";
+ public static final String TAGS = "tags";
+ public static final String GROUPS = "groups";
+ public static final String PIPELINES = "pipelines";
+ public static final String WFPROPERTIES = "workflow-properties";
+ public static final String RUNSON = "runs-on";
+ public static final String STOREDIN = "stored-in";
+ public static final String FREQUENCY = "frequency";
+ public static final String ATTRIBUTE_DB = "db";
+
+ private FalconBridge() {
+ }
/**
* Creates cluster entity
@@ -92,75 +95,49 @@ public class FalconBridge {
}
if (StringUtils.isNotEmpty(cluster.getTags())) {
- clusterRef.set(FalconBridge.TAGS,
- EventUtil.convertKeyValueStringToMap(cluster.getTags()));
+ clusterRef.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(cluster.getTags()));
}
return clusterRef;
}
- private static Referenceable createFeedEntity(Feed feed, Referenceable
clusterReferenceable) {
- LOG.info("Creating feed dataset: {}", feed.getName());
-
- Referenceable feedEntity = new
Referenceable(FalconDataTypes.FALCON_FEED.getName());
- feedEntity.set(AtlasClient.NAME, feed.getName());
- feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription());
- String feedQualifiedName =
- getFeedQualifiedName(feed.getName(), (String)
clusterReferenceable.get(AtlasClient.NAME));
- feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
feedQualifiedName);
- feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString());
- feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
- if (feed.getACL() != null) {
- feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner());
- }
-
- if (StringUtils.isNotEmpty(feed.getTags())) {
- feedEntity.set(FalconBridge.TAGS,
- EventUtil.convertKeyValueStringToMap(feed.getTags()));
- }
-
- if (feed.getGroups() != null) {
- feedEntity.set(FalconBridge.GROUPS, feed.getGroups());
- }
-
- return feedEntity;
- }
-
public static List<Referenceable> createFeedCreationEntity(Feed feed,
ConfigurationStore falconStore) throws FalconException, URISyntaxException {
LOG.info("Creating feed : {}", feed.getName());
List<Referenceable> entities = new ArrayList<>();
if (feed.getClusters() != null) {
- List<Referenceable> replicationInputs = new ArrayList<>();
+ List<Referenceable> replicationInputs = new ArrayList<>();
List<Referenceable> replicationOutputs = new ArrayList<>();
for (org.apache.falcon.entity.v0.feed.Cluster feedCluster :
feed.getClusters().getClusters()) {
- org.apache.falcon.entity.v0.cluster.Cluster cluster =
falconStore.get(EntityType.CLUSTER,
- feedCluster.getName());
+ org.apache.falcon.entity.v0.cluster.Cluster cluster =
falconStore.get(EntityType.CLUSTER, feedCluster.getName());
// set cluster
Referenceable clusterReferenceable =
getClusterEntityReference(cluster.getName(), cluster.getColo());
+
entities.add(clusterReferenceable);
// input as hive_table or hdfs_path, output as falcon_feed
dataset
- List<Referenceable> inputs = new ArrayList<>();
+ List<Referenceable> inputs = new ArrayList<>();
List<Referenceable> inputReferenceables =
getInputEntities(cluster, feed);
+
if (inputReferenceables != null) {
entities.addAll(inputReferenceables);
inputs.add(inputReferenceables.get(inputReferenceables.size() - 1));
}
- List<Referenceable> outputs = new ArrayList<>();
- Referenceable feedEntity = createFeedEntity(feed,
clusterReferenceable);
+ List<Referenceable> outputs = new ArrayList<>();
+ Referenceable feedEntity = createFeedEntity(feed,
clusterReferenceable);
+
if (feedEntity != null) {
entities.add(feedEntity);
outputs.add(feedEntity);
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
- Referenceable feedCreateEntity = new
Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
- String feedQualifiedName =
getFeedQualifiedName(feed.getName(), cluster.getName());
+ Referenceable feedCreateEntity = new
Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
+ String feedQualifiedName =
getFeedQualifiedName(feed.getName(), cluster.getName());
feedCreateEntity.set(AtlasClient.NAME, feed.getName());
feedCreateEntity.set(AtlasClient.DESCRIPTION, "Feed
creation - " + feed.getName());
@@ -169,6 +146,7 @@ public class FalconBridge {
if (!inputs.isEmpty()) {
feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
}
+
if (!outputs.isEmpty()) {
feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
}
@@ -185,32 +163,29 @@ public class FalconBridge {
}
if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) {
- Referenceable feedReplicationEntity = new
Referenceable(FalconDataTypes
- .FALCON_FEED_REPLICATION.getName());
+ Referenceable feedReplicationEntity = new
Referenceable(FalconDataTypes.FALCON_FEED_REPLICATION.getName());
feedReplicationEntity.set(AtlasClient.NAME, feed.getName());
feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
feed.getName());
-
feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS,
replicationInputs);
feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS,
replicationOutputs);
+
entities.add(feedReplicationEntity);
}
-
}
+
return entities;
}
/**
* Creates process entity
- *
+ *
* @param process process entity
* @param falconStore config store
* @return process instance reference
- *
* @throws FalconException if retrieving from the configuration store fail
*/
- public static List<Referenceable>
createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
- ConfigurationStore
falconStore) throws FalconException {
+ public static List<Referenceable>
createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
ConfigurationStore falconStore) throws FalconException {
LOG.info("Creating process Entity : {}", process.getName());
// The requirement is for each cluster, create a process entity with
name
@@ -218,44 +193,47 @@ public class FalconBridge {
List<Referenceable> entities = new ArrayList<>();
if (process.getClusters() != null) {
-
for (Cluster processCluster : process.getClusters().getClusters())
{
- org.apache.falcon.entity.v0.cluster.Cluster cluster =
- falconStore.get(EntityType.CLUSTER,
processCluster.getName());
- Referenceable clusterReferenceable =
getClusterEntityReference(cluster.getName(), cluster.getColo());
+ org.apache.falcon.entity.v0.cluster.Cluster cluster
= falconStore.get(EntityType.CLUSTER, processCluster.getName());
+ Referenceable
clusterReferenceable = getClusterEntityReference(cluster.getName(),
cluster.getColo());
+
entities.add(clusterReferenceable);
List<Referenceable> inputs = new ArrayList<>();
+
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
- Feed feed = falconStore.get(EntityType.FEED,
input.getFeed());
+ Feed feed =
falconStore.get(EntityType.FEED, input.getFeed());
Referenceable inputReferenceable =
getFeedDataSetReference(feed, clusterReferenceable);
+
entities.add(inputReferenceable);
inputs.add(inputReferenceable);
}
}
List<Referenceable> outputs = new ArrayList<>();
+
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = falconStore.get(EntityType.FEED,
output.getFeed());
+ Feed feed =
falconStore.get(EntityType.FEED, output.getFeed());
Referenceable outputReferenceable =
getFeedDataSetReference(feed, clusterReferenceable);
+
entities.add(outputReferenceable);
outputs.add(outputReferenceable);
}
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
-
Referenceable processEntity = new
Referenceable(FalconDataTypes.FALCON_PROCESS.getName());
+
processEntity.set(AtlasClient.NAME, process.getName());
- processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- getProcessQualifiedName(process.getName(),
cluster.getName()));
+
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getProcessQualifiedName(process.getName(), cluster.getName()));
processEntity.set(FalconBridge.FREQUENCY,
process.getFrequency().toString());
if (!inputs.isEmpty()) {
processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
}
+
if (!outputs.isEmpty()) {
processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
}
@@ -269,43 +247,89 @@ public class FalconBridge {
}
if (StringUtils.isNotEmpty(process.getTags())) {
- processEntity.set(FalconBridge.TAGS,
-
EventUtil.convertKeyValueStringToMap(process.getTags()));
+ processEntity.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(process.getTags()));
}
if (process.getPipelines() != null) {
processEntity.set(FalconBridge.PIPELINES,
process.getPipelines());
}
- processEntity.set(FalconBridge.WFPROPERTIES,
- getProcessEntityWFProperties(process.getWorkflow(),
- process.getName()));
+ processEntity.set(FalconBridge.WFPROPERTIES,
getProcessEntityWFProperties(process.getWorkflow(), process.getName()));
entities.add(processEntity);
}
-
}
}
+
return entities;
}
- private static List<Referenceable>
getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster,
- Feed feed) throws
URISyntaxException {
+ public static String getFeedQualifiedName(final String feedName, final
String clusterName) {
+ return String.format("%s@%s", feedName, clusterName);
+ }
+
+ public static String getProcessQualifiedName(final String processName,
final String clusterName) {
+ return String.format("%s@%s", processName, clusterName);
+ }
+
+ public static String normalize(final String str) {
+ if (StringUtils.isBlank(str)) {
+ return null;
+ }
+
+ return str.toLowerCase().trim();
+ }
+
+ private static Referenceable createFeedEntity(Feed feed, Referenceable
clusterReferenceable) {
+ LOG.info("Creating feed dataset: {}", feed.getName());
+
+ Referenceable feedEntity = new
Referenceable(FalconDataTypes.FALCON_FEED.getName());
+
+ feedEntity.set(AtlasClient.NAME, feed.getName());
+ feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription());
+
+ String feedQualifiedName = getFeedQualifiedName(feed.getName(),
(String) clusterReferenceable.get(AtlasClient.NAME));
+
+ feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
feedQualifiedName);
+ feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString());
+ feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
+
+ if (feed.getACL() != null) {
+ feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner());
+ }
+
+ if (StringUtils.isNotEmpty(feed.getTags())) {
+ feedEntity.set(FalconBridge.TAGS,
EventUtil.convertKeyValueStringToMap(feed.getTags()));
+ }
+
+ if (feed.getGroups() != null) {
+ feedEntity.set(FalconBridge.GROUPS, feed.getGroups());
+ }
+
+ return feedEntity;
+ }
+
+ private static List<Referenceable>
getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, Feed
feed) throws URISyntaxException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
FeedHelper.getCluster(feed, cluster.getName());
- if(feedCluster != null) {
+ if (feedCluster != null) {
final CatalogTable table = getTable(feedCluster, feed);
+
if (table != null) {
CatalogStorage storage = new CatalogStorage(cluster, table);
- return createHiveTableInstance(cluster.getName(),
storage.getDatabase().toLowerCase(),
- storage.getTable().toLowerCase());
+
+ return createHiveTableInstance(cluster.getName(),
storage.getDatabase().toLowerCase(), storage.getTable().toLowerCase());
} else {
List<Location> locations =
FeedHelper.getLocations(feedCluster, feed);
+
if (CollectionUtils.isNotEmpty(locations)) {
Location dataLocation =
FileSystemStorage.getLocation(locations, LocationType.DATA);
+
if (dataLocation != null) {
final String pathUri =
normalize(dataLocation.getPath());
+
LOG.info("Registering DFS Path {} ", pathUri);
+
return fillHDFSDataSet(pathUri, cluster.getName());
}
}
@@ -326,91 +350,83 @@ public class FalconBridge {
private static List<Referenceable> fillHDFSDataSet(final String pathUri,
final String clusterName) {
List<Referenceable> entities = new ArrayList<>();
- Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
+ Referenceable ref = new
Referenceable(HiveMetaStoreBridge.HDFS_PATH);
+
ref.set("path", pathUri);
+
// Path path = new Path(pathUri);
// ref.set("name", path.getName());
//TODO - Fix after ATLAS-542 to shorter Name
Path path = new Path(pathUri);
+
ref.set(AtlasClient.NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+
entities.add(ref);
+
return entities;
}
private static Referenceable createHiveDatabaseInstance(String
clusterName, String dbName) {
Referenceable dbRef = new
Referenceable(HiveDataTypes.HIVE_DB.getName());
+
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(AtlasClient.NAME, dbName);
- dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+ dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+
return dbRef;
}
- private static List<Referenceable> createHiveTableInstance(String
clusterName, String dbName,
- String
tableName) {
+ private static List<Referenceable> createHiveTableInstance(String
clusterName, String dbName, String tableName) {
List<Referenceable> entities = new ArrayList<>();
- Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
+ Referenceable dbRef = createHiveDatabaseInstance(clusterName,
dbName);
+
entities.add(dbRef);
Referenceable tableRef = new
Referenceable(HiveDataTypes.HIVE_TABLE.getName());
- tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName,
tableName));
+
+ tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
tableRef.set(ATTRIBUTE_DB, dbRef);
+
entities.add(tableRef);
return entities;
}
- private static Referenceable getClusterEntityReference(final String
clusterName,
- final String colo) {
+ private static Referenceable getClusterEntityReference(final String
clusterName, final String colo) {
LOG.info("Getting reference for entity {}", clusterName);
+
Referenceable clusterRef = new
Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
+
clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName));
clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName);
clusterRef.set(FalconBridge.COLO, colo);
+
return clusterRef;
}
-
private static Referenceable getFeedDataSetReference(Feed feed,
Referenceable clusterReference) {
LOG.info("Getting reference for entity {}", feed.getName());
+
Referenceable feedDatasetRef = new
Referenceable(FalconDataTypes.FALCON_FEED.getName());
+
feedDatasetRef.set(AtlasClient.NAME, feed.getName());
- feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getFeedQualifiedName(feed.getName(),
- (String)
clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)));
+ feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getFeedQualifiedName(feed.getName(), (String)
clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)));
feedDatasetRef.set(FalconBridge.STOREDIN, clusterReference);
feedDatasetRef.set(FalconBridge.FREQUENCY, feed.getFrequency());
+
return feedDatasetRef;
}
- private static Map<String, String> getProcessEntityWFProperties(final
Workflow workflow,
- final
String processName) {
+ private static Map<String, String> getProcessEntityWFProperties(final
Workflow workflow, final String processName) {
Map<String, String> wfProperties = new HashMap<>();
- wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
- ProcessHelper.getProcessWorkflowName(workflow.getName(),
processName));
- wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
- workflow.getVersion());
- wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
- workflow.getEngine().value());
- return wfProperties;
- }
-
- public static String getFeedQualifiedName(final String feedName, final
String clusterName) {
- return String.format("%s@%s", feedName, clusterName);
- }
-
- public static String getProcessQualifiedName(final String processName,
final String clusterName) {
- return String.format("%s@%s", processName, clusterName);
- }
+ wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
+
wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
workflow.getVersion());
+ wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
workflow.getEngine().value());
- public static String normalize(final String str) {
- if (StringUtils.isBlank(str)) {
- return null;
- }
- return str.toLowerCase().trim();
+ return wfProperties;
}
}
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
index 51db894ab..37dda6ed0 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,23 +24,14 @@ import org.apache.falcon.entity.v0.Entity;
* Falcon event to interface with Atlas Service.
*/
public class FalconEvent {
- protected String user;
+ protected String user;
protected OPERATION operation;
- protected Entity entity;
+ protected Entity entity;
public FalconEvent(String doAsUser, OPERATION falconOperation, Entity
entity) {
- this.user = doAsUser;
+ this.user = doAsUser;
this.operation = falconOperation;
- this.entity = entity;
- }
-
- public enum OPERATION {
- ADD_CLUSTER,
- UPDATE_CLUSTER,
- ADD_FEED,
- UPDATE_FEED,
- ADD_PROCESS,
- UPDATE_PROCESS,
+ this.entity = entity;
}
public String getUser() {
@@ -54,4 +45,13 @@ public class FalconEvent {
public Entity getEntity() {
return entity;
}
+
+ public enum OPERATION {
+ ADD_CLUSTER,
+ UPDATE_CLUSTER,
+ ADD_FEED,
+ UPDATE_FEED,
+ ADD_PROCESS,
+ UPDATE_PROCESS,
+ }
}
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index b8a73cbe6..3a0f35d8e 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+
import static org.apache.atlas.repository.Constants.FALCON_SOURCE;
/**
@@ -43,31 +44,17 @@ import static
org.apache.atlas.repository.Constants.FALCON_SOURCE;
public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static final Logger LOG =
LoggerFactory.getLogger(FalconHook.class);
- private static ConfigurationStore STORE;
+ private static ConfigurationStore store;
@Override
public String getMessageSource() {
return FALCON_SOURCE;
}
- private enum Operation {
- ADD,
- UPDATE
- }
-
- static {
- try {
- STORE = ConfigurationStore.get();
- } catch (Exception e) {
- LOG.error("Caught exception initializing the falcon hook.", e);
- }
-
- LOG.info("Created Atlas Hook for Falcon");
- }
-
@Override
public void publish(final Data data) {
final FalconEvent event = data.getEvent();
+
try {
fireAndForget(event);
} catch (Throwable t) {
@@ -77,17 +64,19 @@ public class FalconHook extends AtlasHook implements
FalconEventPublisher {
private void fireAndForget(FalconEvent event) throws FalconException,
URISyntaxException {
LOG.info("Entered Atlas hook for Falcon hook operation {}",
event.getOperation());
+
List<HookNotification> messages = new ArrayList<>();
+ Operation op = getOperation(event.getOperation());
+ String user = getUser(event.getUser());
- Operation op = getOperation(event.getOperation());
- String user = getUser(event.getUser());
LOG.info("fireAndForget user:{}", user);
- switch (op) {
- case ADD:
- messages.add(new EntityCreateRequest(user, createEntities(event,
user)));
- break;
+ switch (op) {
+ case ADD:
+ messages.add(new EntityCreateRequest(user,
createEntities(event, user)));
+ break;
}
+
notifyEntities(messages, null);
}
@@ -95,24 +84,23 @@ public class FalconHook extends AtlasHook implements
FalconEventPublisher {
List<Referenceable> entities = new ArrayList<>();
switch (event.getOperation()) {
- case ADD_CLUSTER:
- entities.add(FalconBridge
-
.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster)
event.getEntity()));
- break;
-
- case ADD_PROCESS:
- entities.addAll(FalconBridge.createProcessEntity((Process)
event.getEntity(), STORE));
- break;
-
- case ADD_FEED:
- entities.addAll(FalconBridge.createFeedCreationEntity((Feed)
event.getEntity(), STORE));
- break;
-
- case UPDATE_CLUSTER:
- case UPDATE_FEED:
- case UPDATE_PROCESS:
- default:
- LOG.info("Falcon operation {} is not valid or supported",
event.getOperation());
+ case ADD_CLUSTER:
+
entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster)
event.getEntity()));
+ break;
+
+ case ADD_PROCESS:
+ entities.addAll(FalconBridge.createProcessEntity((Process)
event.getEntity(), store));
+ break;
+
+ case ADD_FEED:
+ entities.addAll(FalconBridge.createFeedCreationEntity((Feed)
event.getEntity(), store));
+ break;
+
+ case UPDATE_CLUSTER:
+ case UPDATE_FEED:
+ case UPDATE_PROCESS:
+ default:
+ LOG.info("Falcon operation {} is not valid or supported",
event.getOperation());
}
return entities;
@@ -120,19 +108,33 @@ public class FalconHook extends AtlasHook implements
FalconEventPublisher {
private static Operation getOperation(final FalconEvent.OPERATION op)
throws FalconException {
switch (op) {
- case ADD_CLUSTER:
- case ADD_FEED:
- case ADD_PROCESS:
- return Operation.ADD;
-
- case UPDATE_CLUSTER:
- case UPDATE_FEED:
- case UPDATE_PROCESS:
- return Operation.UPDATE;
-
- default:
- throw new FalconException("Falcon operation " + op + " is not
valid or supported");
+ case ADD_CLUSTER:
+ case ADD_FEED:
+ case ADD_PROCESS:
+ return Operation.ADD;
+
+ case UPDATE_CLUSTER:
+ case UPDATE_FEED:
+ case UPDATE_PROCESS:
+ return Operation.UPDATE;
+
+ default:
+ throw new FalconException("Falcon operation " + op + " is not
valid or supported");
}
}
-}
+ private enum Operation {
+ ADD,
+ UPDATE
+ }
+
+ static {
+ try {
+ store = ConfigurationStore.get();
+ } catch (Exception e) {
+ LOG.error("Caught exception initializing the falcon hook.", e);
+ }
+
+ LOG.info("Created Atlas Hook for Falcon");
+ }
+}
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
index e36ff23af..ca1032ddc 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,5 +32,4 @@ public enum FalconDataTypes {
public String getName() {
return name().toLowerCase();
}
-
}
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
index a01ec14be..a21244304 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,14 @@
package org.apache.atlas.falcon.publisher;
-
import org.apache.atlas.falcon.event.FalconEvent;
/**
* Falcon publisher for Atlas
*/
public interface FalconEventPublisher {
+ void publish(Data data);
+
class Data {
private FalconEvent event;
@@ -36,6 +37,4 @@ public interface FalconEventPublisher {
return event;
}
}
-
- void publish(final Data data);
}
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
index 7482ba7b8..e3014a408 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
@@ -18,10 +18,10 @@
package org.apache.atlas.falcon.service;
-import org.apache.atlas.falcon.Util.EventUtil;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.hook.FalconHook;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
+import org.apache.atlas.falcon.util.EventUtil;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -30,20 +30,19 @@ import org.apache.falcon.service.FalconService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Atlas service to publish Falcon events
*/
public class AtlasService implements FalconService,
ConfigurationChangeListener {
-
private static final Logger LOG =
LoggerFactory.getLogger(AtlasService.class);
- private FalconEventPublisher publisher;
/**
* Constant for the service name.
*/
public static final String SERVICE_NAME =
AtlasService.class.getSimpleName();
+ private FalconEventPublisher publisher;
+
@Override
public String getName() {
return SERVICE_NAME;
@@ -63,22 +62,22 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
try {
EntityType entityType = entity.getEntityType();
switch (entityType) {
- case CLUSTER:
- addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
- break;
+ case CLUSTER:
+ addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
+ break;
- case PROCESS:
- addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
- break;
+ case PROCESS:
+ addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
+ break;
- case FEED:
- addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
- break;
+ case FEED:
+ addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
+ break;
- default:
- LOG.debug("Entity type not processed {}", entityType);
+ default:
+ LOG.debug("Entity type not processed {}", entityType);
}
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.warn("Error handling entity {}", entity, t);
}
}
@@ -91,26 +90,26 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
public void onChange(Entity oldEntity, Entity newEntity) throws
FalconException {
/**
* Skipping update for now - update uses full update currently and
this might result in all attributes wiped for hive entities
- EntityType entityType = newEntity.getEntityType();
- switch (entityType) {
- case CLUSTER:
- addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
- break;
-
- case PROCESS:
- addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
- break;
-
- case FEED:
- FalconEvent.OPERATION operation = isReplicationFeed((Feed)
newEntity) ?
- FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
- FalconEvent.OPERATION.UPDATE_FEED;
- addEntity(newEntity, operation);
- break;
-
- default:
- LOG.debug("Entity type not processed {}", entityType);
- }
+ EntityType entityType = newEntity.getEntityType();
+ switch (entityType) {
+ case CLUSTER:
+ addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
+ break;
+
+ case PROCESS:
+ addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
+ break;
+
+ case FEED:
+ FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity)
?
+ FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
+ FalconEvent.OPERATION.UPDATE_FEED;
+ addEntity(newEntity, operation);
+ break;
+
+ default:
+ LOG.debug("Entity type not processed {}", entityType);
+ }
**/
}
@@ -124,9 +123,9 @@ public class AtlasService implements FalconService,
ConfigurationChangeListener
LOG.info("Adding {} entity to Atlas: {}",
entity.getEntityType().name(), entity.getName());
try {
- FalconEvent event =
- new FalconEvent(EventUtil.getUser(), operation, entity);
- FalconEventPublisher.Data data = new
FalconEventPublisher.Data(event);
+ FalconEvent event = new
FalconEvent(EventUtil.getUser(), operation, entity);
+ FalconEventPublisher.Data data = new
FalconEventPublisher.Data(event);
+
publisher.publish(data);
} catch (Exception ex) {
throw new FalconException("Unable to publish data to publisher " +
ex.getMessage(), ex);
diff --git
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
similarity index 86%
rename from
addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
rename to
addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
index ef5634009..bcf838ca2 100644
---
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
+++
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.atlas.falcon.Util;
+package org.apache.atlas.falcon.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
@@ -29,24 +29,24 @@ import java.util.Map;
* Falcon event util
*/
public final class EventUtil {
-
private EventUtil() {}
-
public static Map<String, String> convertKeyValueStringToMap(final String
keyValueString) {
if (StringUtils.isBlank(keyValueString)) {
return null;
}
Map<String, String> keyValueMap = new HashMap<>();
+ String[] tags = keyValueString.split(",");
- String[] tags = keyValueString.split(",");
for (String tag : tags) {
- int index = tag.indexOf("=");
- String tagKey = tag.substring(0, index).trim();
+ int index = tag.indexOf("=");
+ String tagKey = tag.substring(0, index).trim();
String tagValue = tag.substring(index + 1, tag.length()).trim();
+
keyValueMap.put(tagKey, tagValue);
}
+
return keyValueMap;
}
@@ -56,6 +56,7 @@ public final class EventUtil {
} catch (Exception ioe) {
//Ignore is failed to get user, uses login user
}
+
return null;
}
}
diff --git
a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index e77f4c96d..c96479dad 100644
---
a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++
b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,17 +23,17 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.falcon.service.AtlasService;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.type.AtlasTypeUtil;
-import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.atlas.falcon.service.AtlasService;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -50,6 +50,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -61,33 +62,145 @@ import static org.testng.Assert.fail;
public class FalconHookIT {
public static final Logger LOG =
org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
- public static final String CLUSTER_RESOURCE = "/cluster.xml";
- public static final String FEED_RESOURCE = "/feed.xml";
- public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
+ public static final String CLUSTER_RESOURCE = "/cluster.xml";
+ public static final String FEED_RESOURCE = "/feed.xml";
+ public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String FEED_REPLICATION_RESOURCE =
"/feed-replication.xml";
- public static final String PROCESS_RESOURCE = "/process.xml";
-
- private AtlasClientV2 atlasClient;
+ public static final String PROCESS_RESOURCE = "/process.xml";
private static final ConfigurationStore STORE = ConfigurationStore.get();
+ private AtlasClientV2 atlasClient;
+
@BeforeClass
public void setUp() throws Exception {
Configuration atlasProperties = ApplicationProperties.get();
+
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- atlasClient = new
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
new String[]{"admin", "admin"});
+ atlasClient = new
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
new String[] {"admin", "admin"});
} else {
atlasClient = new
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
}
AtlasService service = new AtlasService();
+
service.init();
STORE.registerListener(service);
CurrentUser.authenticate(System.getProperty("user.name"));
}
+ @Test
+ public void testCreateProcess() throws Exception {
+ Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
+
+ STORE.publish(EntityType.CLUSTER, cluster);
+
+ assertClusterIsRegistered(cluster);
+
+ Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(),
null);
+ String infeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(infeed.getName(),
cluster.getName()))).getGuid();
+ Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+ String outFeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(),
cluster.getName()))).getGuid();
+ Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE,
"process" + random());
+
+ process.getClusters().getClusters().get(0).setName(cluster.getName());
+ process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+ process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+
+ STORE.publish(EntityType.PROCESS, process);
+
+ String pid = assertProcessIsRegistered(process,
cluster.getName());
+ AtlasEntity processEntity =
atlasClient.getEntityByGuid(pid).getEntity();
+
+ assertNotNull(processEntity);
+ assertEquals(processEntity.getAttribute(AtlasClient.NAME),
process.getName());
+ assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0)), infeedId);
+ assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), outFeedId);
+ }
+
+ @Test
+ public void testReplicationFeed() throws Exception {
+ Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
+
+ STORE.publish(EntityType.CLUSTER, srcCluster);
+
+ assertClusterIsRegistered(srcCluster);
+
+ Cluster targetCluster = loadEntity(EntityType.CLUSTER,
CLUSTER_RESOURCE, "cluster" + random());
+
+ STORE.publish(EntityType.CLUSTER, targetCluster);
+
+ assertClusterIsRegistered(targetCluster);
+
+ Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE,
srcCluster.getName(), targetCluster.getName());
+ String inId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(),
srcCluster.getName()))).getGuid();
+ String outId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(),
targetCluster.getName()))).getGuid();
+ String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
+ AtlasEntity process =
atlasClient.getEntityByGuid(processId).getEntity();
+
+ assertEquals(getGuidFromObjectId(((List<?>)
process.getAttribute("inputs")).get(0)), inId);
+ assertEquals(getGuidFromObjectId(((List<?>)
process.getAttribute("outputs")).get(0)), outId);
+ }
+
+ @Test
+ public void testCreateProcessWithHDFSFeed() throws Exception {
+ Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
+
+ STORE.publish(EntityType.CLUSTER, cluster);
+
+ TypesUtil.Pair<String, Feed> result =
getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName());
+ Feed infeed = result.right;
+ String infeedId = result.left;
+
+ Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+ String outfeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(),
cluster.getName()))).getGuid();
+ Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE,
"process" + random());
+
+ process.getClusters().getClusters().get(0).setName(cluster.getName());
+ process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+ process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+
+ STORE.publish(EntityType.PROCESS, process);
+
+ String pid = assertProcessIsRegistered(process,
cluster.getName());
+ AtlasEntity processEntity =
atlasClient.getEntityByGuid(pid).getEntity();
+
+ assertEquals(processEntity.getAttribute(AtlasClient.NAME),
process.getName());
+
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
+ assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0)), infeedId);
+ assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), outfeedId);
+ }
+
+ /**
+ * Wait for a condition, expressed via a {@link Predicate} to become true.
+ *
+ * @param timeout maximum time in milliseconds to wait for the predicate
to become true.
+ * @param predicate predicate waiting on.
+ */
+ protected void waitFor(int timeout, Predicate predicate) throws Exception {
+ ParamChecker.notNull(predicate, "predicate");
+
+ long mustEnd = System.currentTimeMillis() + timeout;
+
+ while (true) {
+ try {
+ predicate.evaluate();
+ return;
+ } catch (Error | Exception e) {
+ if (System.currentTimeMillis() >= mustEnd) {
+ fail("Assertions failed. Failing after waiting for timeout
" + timeout + " msecs", e);
+ }
+
+ LOG.debug("Waiting up to {} msec as assertion failed", mustEnd
- System.currentTimeMillis(), e);
+
+ Thread.sleep(400);
+ }
+ }
+ }
+
private <T extends Entity> T loadEntity(EntityType type, String resource,
String name) throws JAXBException {
Entity entity = (Entity)
type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
+
switch (entity.getEntityType()) {
case CLUSTER:
((Cluster) entity).setName(name);
@@ -101,7 +214,8 @@ public class FalconHookIT {
((Process) entity).setName(name);
break;
}
- return (T)entity;
+
+ return (T) entity;
}
private String random() {
@@ -112,67 +226,39 @@ public class FalconHookIT {
return
String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName,
tableName);
}
- @Test
- public void testCreateProcess() throws Exception {
- Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
- STORE.publish(EntityType.CLUSTER, cluster);
- assertClusterIsRegistered(cluster);
-
- Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
- String infeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(infeed.getName(),
cluster.getName()))).getGuid();
-
- Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
- String outFeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(outfeed.getName(),
cluster.getName()))).getGuid();
-
- Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE,
"process" + random());
- process.getClusters().getClusters().get(0).setName(cluster.getName());
- process.getInputs().getInputs().get(0).setFeed(infeed.getName());
- process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
- STORE.publish(EntityType.PROCESS, process);
-
- String pid =
assertProcessIsRegistered(process, cluster.getName());
- AtlasEntity processEntity =
atlasClient.getEntityByGuid(pid).getEntity();
- assertNotNull(processEntity);
- assertEquals(processEntity.getAttribute(AtlasClient.NAME),
process.getName());
-
assertEquals(getGuidFromObjectId(((List<?>)processEntity.getAttribute("inputs")).get(0)),
infeedId);
- assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), outFeedId);
- }
-
private String assertProcessIsRegistered(Process process, String
clusterName) throws Exception {
- return
assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
- AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getProcessQualifiedName(process.getName(),
clusterName));
+ return
assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
}
private String assertClusterIsRegistered(Cluster cluster) throws Exception
{
- return
assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
- AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
+ return
assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
}
private TypesUtil.Pair<String, Feed> getHDFSFeed(String feedResource,
String clusterName) throws Exception {
- Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" +
random());
+ Feed feed =
loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
feed.getClusters().getClusters().get(0);
+
feedCluster.setName(clusterName);
STORE.publish(EntityType.FEED, feed);
+
String feedId = assertFeedIsRegistered(feed, clusterName);
+
assertFeedAttributes(feedId);
- String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
- AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(feed.getName(),
clusterName));
+ String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
AtlasEntity processEntity =
atlasClient.getEntityByGuid(processId).getEntity();
+
assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), feedId);
- String inputId = getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0));
+ String inputId = getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0));
AtlasEntity pathEntity =
atlasClient.getEntityByGuid(inputId).getEntity();
+
assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH);
- List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
- Location dataLocation = FileSystemStorage.getLocation(locations,
LocationType.DATA);
-
assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
- FalconBridge.normalize(dataLocation.getPath()));
+ List<Location> locations = FeedHelper.getLocations(feedCluster,
feed);
+ Location dataLocation = FileSystemStorage.getLocation(locations,
LocationType.DATA);
+
+
assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
FalconBridge.normalize(dataLocation.getPath()));
return TypesUtil.Pair.of(feedId, feed);
}
@@ -182,114 +268,67 @@ public class FalconHookIT {
}
private Feed getTableFeed(String feedResource, String clusterName, String
secondClusterName) throws Exception {
- Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" +
random());
+ Feed feed =
loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
feed.getClusters().getClusters().get(0);
+
feedCluster.setName(clusterName);
- String dbName = "db" + random();
+
+ String dbName = "db" + random();
String tableName = "table" + random();
+
feedCluster.getTable().setUri(getTableUri(dbName, tableName));
- String dbName2 = "db" + random();
+ String dbName2 = "db" + random();
String tableName2 = "table" + random();
if (secondClusterName != null) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster2 =
feed.getClusters().getClusters().get(1);
+
feedCluster2.setName(secondClusterName);
feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2));
}
STORE.publish(EntityType.FEED, feed);
+
String feedId = assertFeedIsRegistered(feed, clusterName);
+
assertFeedAttributes(feedId);
verifyFeedLineage(feed.getName(), clusterName, feedId, dbName,
tableName);
if (secondClusterName != null) {
String feedId2 = assertFeedIsRegistered(feed, secondClusterName);
+
assertFeedAttributes(feedId2);
verifyFeedLineage(feed.getName(), secondClusterName, feedId2,
dbName2, tableName2);
}
+
return feed;
}
private void assertFeedAttributes(String feedId) throws Exception {
AtlasEntity feedEntity =
atlasClient.getEntityByGuid(feedId).getEntity();
+
assertEquals(feedEntity.getAttribute(AtlasClient.OWNER), "testuser");
assertEquals(feedEntity.getAttribute(FalconBridge.FREQUENCY),
"hours(1)");
assertEquals(feedEntity.getAttribute(AtlasClient.DESCRIPTION), "test
input");
}
- private void verifyFeedLineage(String feedName, String clusterName, String
feedId, String dbName, String tableName)
- throws Exception{
+ private void verifyFeedLineage(String feedName, String clusterName, String
feedId, String dbName, String tableName) throws Exception {
//verify that lineage from hive table to falcon feed is created
- String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
- AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(feedName, clusterName));
+ String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feedName, clusterName));
AtlasEntity processEntity =
atlasClient.getEntityByGuid(processId).getEntity();
+
assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), feedId);
- String inputId = getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0));
+ String inputId = getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0));
AtlasEntity tableEntity =
atlasClient.getEntityByGuid(inputId).getEntity();
- assertEquals(tableEntity.getTypeName(),
HiveDataTypes.HIVE_TABLE.getName());
-
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
- HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName,
tableName));
+ assertEquals(tableEntity.getTypeName(),
HiveDataTypes.HIVE_TABLE.getName());
+
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
}
private String assertFeedIsRegistered(Feed feed, String clusterName)
throws Exception {
- return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(feed.getName(),
clusterName));
- }
-
- @Test
- public void testReplicationFeed() throws Exception {
- Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
- STORE.publish(EntityType.CLUSTER, srcCluster);
- assertClusterIsRegistered(srcCluster);
-
- Cluster targetCluster = loadEntity(EntityType.CLUSTER,
CLUSTER_RESOURCE, "cluster" + random());
- STORE.publish(EntityType.CLUSTER, targetCluster);
- assertClusterIsRegistered(targetCluster);
-
- Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE,
srcCluster.getName(), targetCluster.getName());
- String inId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-
FalconBridge.getFeedQualifiedName(feed.getName(),
srcCluster.getName()))).getGuid();
- String outId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(feed.getName(),
targetCluster.getName()))).getGuid();
-
-
- String processId =
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
- AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
- AtlasEntity process =
atlasClient.getEntityByGuid(processId).getEntity();
- assertEquals(getGuidFromObjectId(((List<?>)
process.getAttribute("inputs")).get(0)), inId);
- assertEquals(getGuidFromObjectId(((List<?>)
process.getAttribute("outputs")).get(0)), outId);
- }
-
- @Test
- public void testCreateProcessWithHDFSFeed() throws Exception {
- Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE,
"cluster" + random());
- STORE.publish(EntityType.CLUSTER, cluster);
-
- TypesUtil.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE,
cluster.getName());
- Feed infeed = result.right;
- String infeedId = result.left;
-
- Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
- String outfeedId =
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(),
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
- FalconBridge.getFeedQualifiedName(outfeed.getName(),
cluster.getName()))).getGuid();
-
- Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE,
"process" + random());
- process.getClusters().getClusters().get(0).setName(cluster.getName());
- process.getInputs().getInputs().get(0).setFeed(infeed.getName());
- process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
- STORE.publish(EntityType.PROCESS, process);
-
- String pid = assertProcessIsRegistered(process, cluster.getName());
- AtlasEntity processEntity =
atlasClient.getEntityByGuid(pid).getEntity();
- assertEquals(processEntity.getAttribute(AtlasClient.NAME),
process.getName());
-
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
- FalconBridge.getProcessQualifiedName(process.getName(),
cluster.getName()));
- assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("inputs")).get(0)), infeedId);
- assertEquals(getGuidFromObjectId(((List<?>)
processEntity.getAttribute("outputs")).get(0)), outfeedId);
+ return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(),
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
}
private String assertEntityIsRegistered(final String typeName, final
String property, final String value) throws Exception {
@@ -297,10 +336,12 @@ public class FalconHookIT {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo entity =
atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property,
value));
+
assertNotNull(entity);
assertNotNull(entity.getEntity());
}
});
+
return atlasClient.getEntityHeaderByAttribute(typeName,
Collections.singletonMap(property, value)).getGuid();
}
@@ -323,28 +364,4 @@ public class FalconHookIT {
*/
void evaluate() throws Exception;
}
-
- /**
- * Wait for a condition, expressed via a {@link Predicate} to become true.
- *
- * @param timeout maximum time in milliseconds to wait for the predicate
to become true.
- * @param predicate predicate waiting on.
- */
- protected void waitFor(int timeout, Predicate predicate) throws Exception {
- ParamChecker.notNull(predicate, "predicate");
- long mustEnd = System.currentTimeMillis() + timeout;
-
- while (true) {
- try {
- predicate.evaluate();
- return;
- } catch(Error | Exception e) {
- if (System.currentTimeMillis() >= mustEnd) {
- fail("Assertions failed. Failing after waiting for timeout
" + timeout + " msecs", e);
- }
- LOG.debug("Waiting up to {} msec as assertion failed", mustEnd
- System.currentTimeMillis(), e);
- Thread.sleep(400);
- }
- }
- }
}
diff --git
a/addons/falcon-bridge/src/test/resources/atlas-application.properties
b/addons/falcon-bridge/src/test/resources/atlas-application.properties
index 0ce0f46c9..94a75aab4 100644
--- a/addons/falcon-bridge/src/test/resources/atlas-application.properties
+++ b/addons/falcon-bridge/src/test/resources/atlas-application.properties
@@ -15,54 +15,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
######### Atlas Server Configs #########
atlas.rest.address=http://localhost:31000
-
######### Graph Database Configs #########
-
-
# Graph database implementation. Value inserted by maven.
atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
atlas.graph.index.search.solr.wait-searcher=true
-
# Graph Storage
atlas.graph.storage.backend=berkeleyje
-
# Entity repository implementation
atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
-
# Graph Search Index Backend
atlas.graph.index.search.backend=solr
-
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
atlas.graph.storage.transactions=true
-
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here
-
atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
-
#ElasticSearch
atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
-
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
atlas.graph.index.search.solr.embedded=true
atlas.graph.index.search.max-result-set-size=150
-
-
######### Notification Configs #########
atlas.notification.embedded=true
-
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
@@ -73,52 +58,38 @@ atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
#atlas.kafka.auto.commit.enable=false
-
atlas.kafka.enable.auto.commit=false
atlas.kafka.auto.offset.reset=earliest
atlas.kafka.session.timeout.ms=30000
atlas.kafka.offsets.topic.replication.factor=1
-
-
-
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost
atlas.audit.hbase.zookeeper.property.clientPort=19026
-
######### Security Properties #########
-
# SSL config
atlas.enableTLS=false
atlas.server.https.port=31443
-
######### Security Properties #########
-
hbase.security.authentication=simple
-
atlas.hook.falcon.synchronous=true
-
######### JAAS Configuration ########
-
-atlas.jaas.KafkaClient.loginModuleName =
com.sun.security.auth.module.Krb5LoginModule
-atlas.jaas.KafkaClient.loginModuleControlFlag = required
-atlas.jaas.KafkaClient.option.useKeyTab = true
-atlas.jaas.KafkaClient.option.storeKey = true
-atlas.jaas.KafkaClient.option.serviceName = kafka
-atlas.jaas.KafkaClient.option.keyTab =
/etc/security/keytabs/atlas.service.keytab
-atlas.jaas.KafkaClient.option.principal = atlas/[email protected]
-
+atlas.jaas.KafkaClient.loginModuleName=com.sun.security.auth.module.Krb5LoginModule
+atlas.jaas.KafkaClient.loginModuleControlFlag=required
+atlas.jaas.KafkaClient.option.useKeyTab=true
+atlas.jaas.KafkaClient.option.storeKey=true
+atlas.jaas.KafkaClient.option.serviceName=kafka
+atlas.jaas.KafkaClient.option.keyTab=/etc/security/keytabs/atlas.service.keytab
+atlas.jaas.KafkaClient.option.principal=atlas/[email protected]
######### High Availability Configuration ########
atlas.server.ha.enabled=false
#atlas.server.ids=id1
#atlas.server.address.id1=localhost:21000
-
######### Atlas Authorization #########
atlas.authorizer.impl=none
# atlas.authorizer.impl=simple
# atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
-
######### Atlas Authentication #########
atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none
diff --git a/addons/falcon-bridge/src/test/resources/atlas-logback.xml
b/addons/falcon-bridge/src/test/resources/atlas-logback.xml
index 78fd420dc..991cb621d 100755
--- a/addons/falcon-bridge/src/test/resources/atlas-logback.xml
+++ b/addons/falcon-bridge/src/test/resources/atlas-logback.xml
@@ -18,115 +18,114 @@
-->
<configuration>
- <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
- <param name="Target" value="System.out"/>
- <encoder>
- <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
- <level>INFO</level>
- </filter>
- </appender>
-
- <appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${atlas.log.dir}/${atlas.log.file}</file>
- <append>true</append>
- <encoder>
- <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
- </encoder>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
- <maxHistory>20</maxHistory>
- <cleanHistoryOnStart>true</cleanHistoryOnStart>
- </rollingPolicy>
- </appender>
-
- <appender name="AUDIT"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${atlas.log.dir}/audit.log</file>
- <append>true</append>
- <encoder>
- <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
- </encoder>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern>
- <maxHistory>20</maxHistory>
- <cleanHistoryOnStart>false</cleanHistoryOnStart>
- </rollingPolicy>
- </appender>
-
- <appender name="METRICS"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${atlas.log.dir}/metrics.log</file>
- <append>true</append>
- <encoder>
- <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
- </encoder>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern>
- <maxHistory>20</maxHistory>
- <cleanHistoryOnStart>false</cleanHistoryOnStart>
- </rollingPolicy>
- </appender>
-
- <appender name="FAILED"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${atlas.log.dir}/failed.log</file>
- <append>true</append>
- <encoder>
- <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
- </encoder>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern>
- <maxHistory>20</maxHistory>
- <cleanHistoryOnStart>false</cleanHistoryOnStart>
- </rollingPolicy>
- </appender>
-
- <logger name="org.apache.atlas" additivity="false" level="info">
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false"
level="debug">
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook"
additivity="false" level="debug">
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="org.janusgraph" additivity="false" level="warn">
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="org.springframework" additivity="false" level="warn">
- <appender-ref ref="console"/>
- </logger>
-
- <logger name="org.eclipse" additivity="false" level="warn">
- <appender-ref ref="console"/>
- </logger>
-
- <logger name="com.sun.jersey" additivity="false" level="warn">
- <appender-ref ref="console"/>
- </logger>
-
-
- <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was
supplied but isn't a known config -->
- <logger name="org.apache.kafka.common.config.AbstractConfig"
additivity="false" level="error">
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="METRICS" additivity="false" level="debug">
- <appender-ref ref="METRICS"/>
- </logger>
-
- <logger name="FAILED" additivity="false" level="info">
- <appender-ref ref="FAILED"/>
- </logger>
-
- <logger name="AUDIT" additivity="false" level="info">
- <appender-ref ref="AUDIT"/>
- </logger>
-
- <root level="warn">
- <appender-ref ref="FILE"/>
- </root>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+
+ <appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${atlas.log.dir}/${atlas.log.file}</file>
+ <append>true</append>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+
<fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
+ <maxHistory>20</maxHistory>
+ <cleanHistoryOnStart>true</cleanHistoryOnStart>
+ </rollingPolicy>
+ </appender>
+
+ <appender name="AUDIT"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${atlas.log.dir}/audit.log</file>
+ <append>true</append>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern>
+ <maxHistory>20</maxHistory>
+ <cleanHistoryOnStart>false</cleanHistoryOnStart>
+ </rollingPolicy>
+ </appender>
+
+ <appender name="METRICS"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${atlas.log.dir}/metrics.log</file>
+ <append>true</append>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern>
+ <maxHistory>20</maxHistory>
+ <cleanHistoryOnStart>false</cleanHistoryOnStart>
+ </rollingPolicy>
+ </appender>
+
+ <appender name="FAILED"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${atlas.log.dir}/failed.log</file>
+ <append>true</append>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern>
+ <maxHistory>20</maxHistory>
+ <cleanHistoryOnStart>false</cleanHistoryOnStart>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.apache.atlas" additivity="false" level="info">
+ <appender-ref ref="FILE" />
+ </logger>
+
+ <logger name="org.apache.atlas.impala.ImpalaLineageTool"
additivity="false" level="debug">
+ <appender-ref ref="FILE" />
+ </logger>
+
+ <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook"
additivity="false" level="debug">
+ <appender-ref ref="FILE" />
+ </logger>
+
+ <logger name="org.janusgraph" additivity="false" level="warn">
+ <appender-ref ref="FILE" />
+ </logger>
+
+ <logger name="org.springframework" additivity="false" level="warn">
+ <appender-ref ref="console" />
+ </logger>
+
+ <logger name="org.eclipse" additivity="false" level="warn">
+ <appender-ref ref="console" />
+ </logger>
+
+ <logger name="com.sun.jersey" additivity="false" level="warn">
+ <appender-ref ref="console" />
+ </logger>
+
+ <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was
supplied but isn't a known config -->
+ <logger name="org.apache.kafka.common.config.AbstractConfig"
additivity="false" level="error">
+ <appender-ref ref="FILE" />
+ </logger>
+
+ <logger name="METRICS" additivity="false" level="debug">
+ <appender-ref ref="METRICS" />
+ </logger>
+
+ <logger name="FAILED" additivity="false" level="info">
+ <appender-ref ref="FAILED" />
+ </logger>
+
+ <logger name="AUDIT" additivity="false" level="info">
+ <appender-ref ref="AUDIT" />
+ </logger>
+
+ <root level="warn">
+ <appender-ref ref="FILE" />
+ </root>
</configuration>
diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml
b/addons/falcon-bridge/src/test/resources/cluster.xml
index b183847db..b7bbda74c 100644
--- a/addons/falcon-bridge/src/test/resources/cluster.xml
+++ b/addons/falcon-bridge/src/test/resources/cluster.xml
@@ -20,7 +20,7 @@
Primary cluster configuration for demo vm
-->
<cluster colo="west-coast" description="Primary Cluster" name="testcluster"
- xmlns="uri:falcon:cluster:0.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ xmlns="uri:falcon:cluster:0.1">
<interfaces>
<interface type="readonly" endpoint="hftp://localhost:10070"
version="1.1.1" />
diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
index 435db0745..582998a3c 100644
--- a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
+++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
@@ -21,19 +21,19 @@
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
- <late-arrival cut-off="hours(3)"/>
+ <late-arrival cut-off="hours(3)" />
<clusters>
<cluster name="testcluster" type="source">
- <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
- <retention limit="hours(24)" action="delete"/>
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+ <retention limit="hours(24)" action="delete" />
</cluster>
</clusters>
<locations>
- <location type="data"
path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
+ <location type="data"
path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</locations>
- <ACL owner="testuser" group="group" permission="0x755"/>
- <schema location="hcat" provider="hcat"/>
+ <ACL owner="testuser" group="group" permission="0x755" />
+ <schema location="hcat" provider="hcat" />
</feed>
diff --git a/addons/falcon-bridge/src/test/resources/feed-replication.xml
b/addons/falcon-bridge/src/test/resources/feed-replication.xml
index dcd427b18..42c4f863f 100644
--- a/addons/falcon-bridge/src/test/resources/feed-replication.xml
+++ b/addons/falcon-bridge/src/test/resources/feed-replication.xml
@@ -21,23 +21,23 @@
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
- <late-arrival cut-off="hours(3)"/>
+ <late-arrival cut-off="hours(3)" />
<clusters>
<cluster name="testcluster" type="source">
- <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
- <retention limit="hours(24)" action="delete"/>
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+ <retention limit="hours(24)" action="delete" />
<table
uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
<cluster name="testcluster" type="target">
- <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
- <retention limit="hours(24)" action="delete"/>
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+ <retention limit="hours(24)" action="delete" />
<table
uri="catalog:outdb:outtable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
</clusters>
<table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
- <ACL owner="testuser" group="group" permission="0x755"/>
- <schema location="hcat" provider="hcat"/>
+ <ACL owner="testuser" group="group" permission="0x755" />
+ <schema location="hcat" provider="hcat" />
</feed>
diff --git a/addons/falcon-bridge/src/test/resources/feed.xml
b/addons/falcon-bridge/src/test/resources/feed.xml
index 473c745ce..f58316b77 100644
--- a/addons/falcon-bridge/src/test/resources/feed.xml
+++ b/addons/falcon-bridge/src/test/resources/feed.xml
@@ -21,18 +21,18 @@
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
- <late-arrival cut-off="hours(3)"/>
+ <late-arrival cut-off="hours(3)" />
<clusters>
<cluster name="testcluster" type="source">
- <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
- <retention limit="hours(24)" action="delete"/>
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+ <retention limit="hours(24)" action="delete" />
<table
uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
</clusters>
<table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
- <ACL owner="testuser" group="group" permission="0x755"/>
- <schema location="hcat" provider="hcat"/>
+ <ACL owner="testuser" group="group" permission="0x755" />
+ <schema location="hcat" provider="hcat" />
</feed>
diff --git a/addons/falcon-bridge/src/test/resources/process.xml
b/addons/falcon-bridge/src/test/resources/process.xml
index b94d0a847..62e7542f1 100644
--- a/addons/falcon-bridge/src/test/resources/process.xml
+++ b/addons/falcon-bridge/src/test/resources/process.xml
@@ -22,7 +22,7 @@
<clusters>
<cluster name="testcluster">
- <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/>
+ <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z" />
</cluster>
</clusters>
@@ -32,22 +32,22 @@
<timezone>UTC</timezone>
<inputs>
- <input end="today(0,0)" start="today(0,0)" feed="testinput"
name="input"/>
+ <input end="today(0,0)" start="today(0,0)" feed="testinput"
name="input" />
</inputs>
<outputs>
- <output instance="now(0,0)" feed="testoutput" name="output"/>
+ <output instance="now(0,0)" feed="testoutput" name="output" />
</outputs>
<properties>
- <property name="blah" value="blah"/>
+ <property name="blah" value="blah" />
</properties>
- <workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/>
+ <workflow engine="hive" path="/falcon/test/apps/hive/script.hql" />
- <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+ <retry policy="periodic" delay="minutes(10)" attempts="3" />
<late-process policy="exp-backoff" delay="hours(2)">
- <late-input input="input" workflow-path="/falcon/test/workflow"/>
+ <late-input input="input" workflow-path="/falcon/test/workflow" />
</late-process>
</process>
diff --git a/addons/falcon-bridge/src/test/resources/startup.properties
b/addons/falcon-bridge/src/test/resources/startup.properties
index 962347039..661a2be89 100644
--- a/addons/falcon-bridge/src/test/resources/startup.properties
+++ b/addons/falcon-bridge/src/test/resources/startup.properties
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
*.domain=debug
*.config.store.persist=false
*.config.store.uri=target/config_store
\ No newline at end of file