This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 52aaf934b ATLAS-4738: Dynamic Index Recovery issues and improvements
52aaf934b is described below
commit 52aaf934bc0a3a6d079fe7b1722fe49b16b3f666
Author: radhikakundam <[email protected]>
AuthorDate: Wed Mar 29 11:16:26 2023 -0700
ATLAS-4738: Dynamic Index Recovery issues and improvements
Signed-off-by: radhikakundam <[email protected]>
(cherry picked from commit 882954e2969d6018d2bc8977f9dc18a6e3d1d5ce)
---
.../main/java/org/apache/atlas/AtlasClientV2.java | 18 +++
.../org/apache/atlas/repository/Constants.java | 9 +-
.../graphdb/janus/AtlasJanusGraphDatabase.java | 5 +-
.../graphdb/janus/AtlasJanusGraphManagement.java | 2 +-
.../java/org/apache/atlas/AtlasConfiguration.java | 3 +-
.../repository/graph/IndexRecoveryService.java | 121 +++++++++++++----
.../apache/atlas/web/rest/IndexRecoveryREST.java | 143 +++++++++++++++++++++
.../atlas/web/integration/IndexRecoveryRestIT.java | 54 ++++++++
8 files changed, 318 insertions(+), 37 deletions(-)
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 7c8e875a1..6f97da192 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -83,6 +83,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -137,6 +138,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
//Notification APIs
private static final String NOTIFICATION_URI = BASE_URI +
"v2/notification";
+ //IndexRecovery APIs
+ private static final String INDEX_RECOVERY_URI = BASE_URI +
"v2/indexrecovery";
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword)
{
super(baseUrl, basicAuthUserNamePassword);
@@ -1031,6 +1034,18 @@ public class AtlasClientV2 extends AtlasBaseClient {
callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC,
topic), (Class<?>) null, messages);
}
+ public Map<String, String> getIndexRecoveryData() throws
AtlasServiceException {
+ return callAPI(API_V2.GET_INDEX_RECOVERY_DATA, Map.class, null);
+ }
+
+ public void startIndexRecovery(Instant startTime) throws
AtlasServiceException {
+ MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+
+ queryParams.add("startTime", startTime != null ?
String.valueOf(startTime) : null);
+
+ callAPI(API_V2.START_INDEX_RECOVERY, (Class<?>) null, queryParams);
+ }
+
@VisibleForTesting
public API formatPathWithParameter(API api, String... params) {
return formatPathParameters(api, params);
@@ -1212,6 +1227,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new
API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST,
Response.Status.NO_CONTENT);
+ public static final API_V2 GET_INDEX_RECOVERY_DATA = new
API_V2(INDEX_RECOVERY_URI , HttpMethod.GET, Response.Status.OK);
+ public static final API_V2 START_INDEX_RECOVERY = new
API_V2(INDEX_RECOVERY_URI + "/start", HttpMethod.POST,
Response.Status.NO_CONTENT);
+
// labels APIs
public static final API_V2 ADD_LABELS = new
API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT,
Response.Status.NO_CONTENT);
public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new
API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT,
Response.Status.NO_CONTENT);
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index c84e1b2d0..51b093284 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -233,10 +233,11 @@ public final class Constants {
/**
* Index Recovery vertex property keys.
*/
- public static final String INDEX_RECOVERY_PREFIX =
INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
- public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
- public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
- public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+ public static final String INDEX_RECOVERY_PREFIX =
INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
public static final String SQOOP_SOURCE = "sqoop";
public static final String FALCON_SOURCE = "falcon";
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index cb3e8d993..115b681cc 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
@@ -77,9 +78,7 @@ public class AtlasJanusGraphDatabase implements
GraphDatabase<AtlasJanusVertex,
public static final String INDEX_BACKEND_ES = "elasticsearch";
public static final String GRAPH_TX_LOG_CONF = "tx.log-tx";
public static final String GRAPH_TX_LOG_VERBOSE_CONF =
"tx.recovery.verbose";
- public static final String SOLR_INDEX_TX_LOG_TTL_CONF =
"write.ahead.log.ttl.in.hours";
public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl";
- public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs
private static volatile AtlasJanusGraph atlasGraphInstance = null;
private static volatile JanusGraph graphInstance;
@@ -233,7 +232,7 @@ public class AtlasJanusGraphDatabase implements
GraphDatabase<AtlasJanusVertex,
public static void configureTxLogBasedIndexRecovery() {
try {
boolean recoveryEnabled =
ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF,
DEFAULT_INDEX_RECOVERY);
- long ttl =
ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF,
DEFAULT_GRAPH_TX_LOG_TTL);
+ long ttl =
AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
Duration txLogTtlSecs =
Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
Map<String, Object> properties = new HashMap<String, Object>() {{
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index e7de83005..b3807dfc4 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -434,7 +434,7 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
TransactionRecovery txRecovery = (TransactionRecovery)
txRecoveryObject;
StandardJanusGraph janusGraph = (StandardJanusGraph)
this.graph.getGraph();
- LOG.info("stopIndexRecovery: Index Client is unhealthy. Index
recovery: Paused!");
+ LOG.info("stopIndexRecovery: Index recovery: Paused!");
janusGraph.getBackend().getSystemTxLog().close();
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 31ec605f3..58a2fa725 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -92,7 +92,8 @@ public enum AtlasConfiguration {
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
- METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days
default
+ METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days
default
+ SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10
days default
private static final Configuration APPLICATION_PROPERTIES;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index caf31634b..9bcad5475 100644
---
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.graph;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -27,6 +28,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
@@ -35,12 +37,16 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.text.SimpleDateFormat;
import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
+import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME;
import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
@@ -52,14 +58,14 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
private static final String DATE_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
private static final String INDEX_HEALTH_MONITOR_THREAD_NAME =
"index-health-monitor";
private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL =
"atlas.graph.index.status.check.frequency";
- private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME =
"atlas.graph.index.recovery.start.time";
+ private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME =
"atlas.index.recovery.start.time";
private static final long SOLR_STATUS_RETRY_DEFAULT_MS =
30000; // 30 secs default
private final Thread indexHealthMonitor;
- private final RecoveryInfoManagement recoveryInfoManagement;
+ public final RecoveryInfoManagement recoveryInfoManagement;
private Configuration configuration;
private boolean isIndexRecoveryEnabled;
- private RecoveryThread recoveryThread;
+ public RecoveryThread recoveryThread;
@Inject
public IndexRecoveryService(Configuration config, AtlasGraph graph) {
@@ -151,7 +157,7 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
}
- private static class RecoveryThread implements Runnable {
+ public static class RecoveryThread implements Runnable {
private final AtlasGraph graph;
private final RecoveryInfoManagement recoveryInfoManagement;
private long indexStatusCheckRetryMillis;
@@ -176,7 +182,7 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
while (shouldRun.get()) {
try {
- boolean isIdxHealthy = isIndexBackendHealthy();
+ boolean isIdxHealthy = waitAndCheckIfIndexBackendHealthy();
if (this.txRecoveryObject == null && isIdxHealthy) {
startMonitoring();
@@ -207,42 +213,68 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
}
- private boolean isIndexBackendHealthy() throws AtlasException,
InterruptedException {
+ private boolean waitAndCheckIfIndexBackendHealthy() throws
AtlasException, InterruptedException {
Thread.sleep(indexStatusCheckRetryMillis);
+ return isIndexBackendHealthy();
+ }
+
+ public boolean isIndexBackendHealthy() throws AtlasException {
return this.graph.getGraphIndexClient().isHealthy();
}
+ public void startMonitoringByUserRequest(Long startTime) {
+ startMonitoring(startTime);
+ }
+
private void startMonitoring() {
- Long startTime = null;
+ startMonitoring(recoveryInfoManagement.getStartTime());
+ }
+
+ private void startMonitoring(Long startTime) {
+ if (startTime == null || startTime == 0L) {
+ LOG.error("Index Recovery requested without start time");
+ return;
+ }
try {
- startTime = recoveryInfoManagement.getStartTime();
txRecoveryObject =
this.graph.getManagementSystem().startIndexRecovery(startTime);
printIndexRecoveryStats();
- } catch (Exception e) {
- LOG.error("Index Recovery: Start: Error!", e);
- } finally {
+
LOG.info("Index Recovery: Started! Recovery time: {}",
Instant.ofEpochMilli(startTime));
+ } catch (Exception e) {
+ LOG.error("Index Recovery with recovery time: {} failed",
Instant.ofEpochMilli(startTime), e);
}
}
+ public void stopMonitoringByUserRequest() {
+ stopIndexRecovery();
+ LOG.info("Index Recovery: Stopped!");
+ }
+
private void stopMonitoring() {
- Instant newStartTime =
Instant.now().minusMillis(indexStatusCheckRetryMillis);
+ stopIndexRecoveryAndUpdateStartTime();
+ }
+
+ private void stopIndexRecoveryAndUpdateStartTime() {
+ Instant newStartTime = Instant.now().minusMillis(2 *
indexStatusCheckRetryMillis);
+
+ stopIndexRecovery();
+
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
+
+ LOG.info("Index Recovery: Stopped! Recovery time: {}",
newStartTime);
+ }
+ private void stopIndexRecovery() {
try {
this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
-
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
-
printIndexRecoveryStats();
} catch (Exception e) {
LOG.info("Index Recovery: Stopped! Error!", e);
} finally {
this.txRecoveryObject = null;
-
- LOG.info("Index Recovery: Stopped! Recovery time: {}",
newStartTime);
}
}
@@ -252,7 +284,7 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
@VisibleForTesting
- static class RecoveryInfoManagement {
+ public static class RecoveryInfoManagement {
private static final String INDEX_RECOVERY_TYPE_NAME =
"__solrIndexRecoveryInfo";
private final AtlasGraph graph;
@@ -261,20 +293,46 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
this.graph = graph;
}
- public void updateStartTime(Long startTime) {
+ public void updateStartTime(long time) {
+ updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_START_TIME,
time);
+ }
+
+ public void updateCustomStartTime(long time) {
+ updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME,
time);
+ }
+
+ public void updateIndexRecoveryTime(String timePropertyKey, long time)
{
+ Map<String, String> indexRecoveryData = new HashMap<>();
+ indexRecoveryData.put(timePropertyKey, String.valueOf(time));
+ updateIndexRecoveryData(indexRecoveryData);
+ }
+
+ public void updateIndexRecoveryData(Map<String, String>
indexRecoveryData) {
try {
- Long prevStartTime = null;
- AtlasVertex vertex = findVertex();
+ Long startTime =
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_START_TIME));
+ Long prevStartTime =
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME));
+ Long customStartTime =
NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME));
+ boolean isStartTimeUpdated = startTime != null ? true :
false;
+ AtlasVertex vertex = findVertex();
if (vertex == null) {
vertex = graph.addVertex();
+ setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
} else {
- prevStartTime = getStartTime(vertex);
+ prevStartTime = isStartTimeUpdated ? getStartTime(vertex)
: prevStartTime;
}
- setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME,
INDEX_RECOVERY_TYPE_NAME);
- setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
- setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+ if (startTime != null) {
+ setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
+ }
+
+ if (prevStartTime != null) {
+ setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+ }
+
+ if (customStartTime != null) {
+ setEncodedProperty(vertex,
PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, customStartTime);
+ }
} catch (Exception ex) {
LOG.error("Error: Updating: {}!", ex);
@@ -290,10 +348,12 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
private Long getStartTime(AtlasVertex vertex) {
+ Long defaultStartTime = getStartTimeByTxLogTTL();
+
if (vertex == null) {
- LOG.warn("Vertex passed is NULL: Returned is 0");
+ LOG.warn("Vertex passed is NULL: Returned is startTime by TTL
{}", Instant.ofEpochMilli(defaultStartTime));
- return 0L;
+ return defaultStartTime;
}
Long startTime = 0L;
@@ -304,10 +364,15 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
LOG.error("Error retrieving startTime", e);
}
- return startTime;
+ return startTime == null || startTime == 0L ? defaultStartTime :
startTime;
+ }
+
+ private Long getStartTimeByTxLogTTL() {
+ long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
+ return Instant.now().minus(ttl, ChronoUnit.HOURS).toEpochMilli();
}
- private AtlasVertex findVertex() {
+ public AtlasVertex findVertex() {
AtlasGraphQuery query =
graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
Iterator<AtlasVertex> results = query.vertices().iterator();
diff --git
a/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java
b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java
new file mode 100644
index 000000000..72916d261
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/IndexRecoveryREST.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.rest;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.annotation.Timed;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graph.IndexRecoveryService;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.web.util.DateTimeHelper;
+import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.format.annotation.DateTimeFormat;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.*;
+
+@Path("v2/indexrecovery")
+@Singleton
+@Service
+@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+public class IndexRecoveryREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(IndexRecoveryREST.class);
+ private static final Logger PERF_LOG =
AtlasPerfTracer.getPerfLogger("rest.IndexRecoveryREST");
+
+ private final IndexRecoveryService indexRecoveryService;
+ private final AtlasGraph graph;
+
+ @Inject
+ IndexRecoveryREST(IndexRecoveryService indexRecoveryService, AtlasGraph
graph) {
+ this.indexRecoveryService = indexRecoveryService;
+ this.graph = graph;
+ }
+ /**
+ * @return Future index recovery start time and previous recovery start
time if applicable
+ * @HTTP 200 If Index recovery data exists for the given entity
+ * @HTTP 400 Bad query parameters
+ */
+ @GET
+ @Timed
+ public Map<String, String> getIndexRecoveryData() {
+
+ AtlasPerfTracer perf = null;
+ Long startTime = null;
+ Long prevTime = null;
+ Long customStartTime = null;
+ Map<String, String> indexRecoveryData = new HashMap<>();
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"IndexRecoveryREST.getIndexRecoveryData()");
+ }
+
+ AtlasVertex indexRecoveryVertex =
indexRecoveryService.recoveryInfoManagement.findVertex();
+ if (indexRecoveryVertex != null) {
+ startTime =
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME,
Long.class);
+ prevTime =
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME,
Long.class);
+ customStartTime =
indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME,
Long.class);
+ }
+
+
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_START_TIME),
startTime != null ? Instant.ofEpochMilli(startTime).toString() : "Not
applicable");
+
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME),
prevTime != null ? Instant.ofEpochMilli(prevTime).toString() : "Not
applicable");
+
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME),
customStartTime != null ? Instant.ofEpochMilli(customStartTime).toString() :
"Not applicable");
+
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+
+ return indexRecoveryData;
+ }
+
+ @POST
+ @Path("/start")
+ public void startCustomIndexRecovery(@QueryParam("startTime")
@DateTimeFormat(pattern = DateTimeHelper.ISO8601_FORMAT)
+ final String
startTime) throws AtlasBaseException, AtlasException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"IndexRecoveryREST.getIndexRecoveryData()");
+ }
+
+ AtlasAuthorizationUtils.verifyAccess(new
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "to start dynamic index
recovery by custom time");
+
+ if (startTime == null) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST,
"Index Recovery requested without start time");
+ }
+
+ if (!indexRecoveryService.recoveryThread.isIndexBackendHealthy()) {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR,
"Index recovery can not be started - Solr Health: Unhealthy");
+ }
+
+ long startTimeMilli = Instant.parse(startTime).toEpochMilli();
+
+ indexRecoveryService.recoveryThread.stopMonitoringByUserRequest();
+
+
indexRecoveryService.recoveryThread.startMonitoringByUserRequest(startTimeMilli);
+
+
indexRecoveryService.recoveryInfoManagement.updateCustomStartTime(startTimeMilli);
+
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ public static String getPropertyKeyByRemovingPrefix(String propertyKey) {
+ return StringUtils.removeStart(propertyKey, INDEX_RECOVERY_PREFIX);
+ }
+}
\ No newline at end of file
diff --git
a/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
new file mode 100644
index 000000000..1aaa6959d
--- /dev/null
+++
b/webapp/src/test/java/org/apache/atlas/web/integration/IndexRecoveryRestIT.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.integration;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.*;
+
+
+public class IndexRecoveryRestIT extends BaseResourceIT {
+
+ @Test
+ public void startIndexRecovery() throws Exception {
+ Map<String, String> indexRecoveryDataBefore =
atlasClientV2.getIndexRecoveryData();
+
+ try {
+ atlasClientV2.startIndexRecovery(null);
+ } catch (AtlasServiceException e) {
+ Assert.assertEquals(e.getStatus().getStatusCode(),
AtlasErrorCode.BAD_REQUEST.getHttpCode().getStatusCode());
+ }
+
+ long now = System.currentTimeMillis();
+ atlasClientV2.startIndexRecovery(Instant.ofEpochMilli(now));
+
+ Map<String, String> indexRecoveryDataAfter =
atlasClientV2.getIndexRecoveryData();
+
+ String customTimeKey =
StringUtils.removeStart(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME,
INDEX_RECOVERY_PREFIX);
+ Assert.assertNotEquals(indexRecoveryDataBefore.get(customTimeKey),
indexRecoveryDataAfter.get(customTimeKey));
+ Assert.assertEquals(Instant.ofEpochMilli(now).toString(),
indexRecoveryDataAfter.get(customTimeKey));
+
+ }
+}