[ 
https://issues.apache.org/jira/browse/HIVE-23516?focusedWorklogId=442455&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442455
 ]

ASF GitHub Bot logged work on HIVE-23516:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/20 20:35
            Start Date: 07/Jun/20 20:35
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #1044:
URL: https://github.com/apache/hive/pull/1044#discussion_r436272172



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+
+import java.util.Map;
+
+/**
+ * Abstract class for Replication Metric Collection.
+ */
+public abstract class ReplicationMetricCollector {

Review comment:
       MetricCollector and ReplicationMetricCollector can be merged. 
MetricCollector can't be used in any other context than in Replication as it 
has dependency on ReplicationMetric

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * MetricCollector.
+ * In memory collection of metrics
+ */
+public final class MetricCollector {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricCollector.class);
+  private Map<Long, ReplicationMetric> metricMap = new ConcurrentHashMap<>();
+  private long maxSize = (long) 
MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal();

Review comment:
       We can default it to 0, anyway init() will override the value

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric.event;
+
+/**
+ * Class for defining the metadata info for replication metrics.
+ */
+public class Metadata {
+  /**
+   * Type of replication.
+   */
+  public enum ReplicationType {
+    BOOTSTRAP,
+    INCREMENTAL
+  }
+  private String dbName;
+  private ReplicationType replicationType;
+  private String stagingDir;

Review comment:
       nit: These three can be marked as final. Though we don't have setter 
method, so no one will be able to change.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+
+import java.util.Map;
+
+/**
+ * Abstract class for Replication Metric Collection.
+ */
+public abstract class ReplicationMetricCollector {
+  private ReplicationMetric replicationMetric;

Review comment:
       Can you please check, there seems to be a concurrency issue w.r.t 
replicationMetric. The replicationMetric object can be used by two threads at 
the same time.
   While persisting, there is possibility for the object's state being 
inconsistent and getting persisted.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -134,6 +135,13 @@
     LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
   }
 
+  /**
+   * Replication Metrics.
+   */
+  public enum MetricName {
+    TABLES, FUNCTIONS, EVENTS, POLICIES, TAGS

Review comment:
       Add Contraints also?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+
+import java.util.Map;
+
+/**
+ * Abstract class for Replication Metric Collection.
+ */
+public abstract class ReplicationMetricCollector {
+  private ReplicationMetric replicationMetric;
+  private MetricCollector metricCollector;
+  private boolean isEnabled;
+
+  public ReplicationMetricCollector(String dbName, Metadata.ReplicationType 
replicationType,
+                             String stagingDir, long dumpExecutionId, HiveConf 
conf) {
+    String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME);
+    long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+    if (!StringUtils.isEmpty(policy) && executionId > 0) {
+      isEnabled = true;
+      metricCollector = MetricCollector.getInstance().init(conf);
+      MetricSink.getInstance().init(conf);
+      Metadata metadata = new Metadata(dbName, replicationType, stagingDir);
+      replicationMetric = new ReplicationMetric(executionId, policy, 
dumpExecutionId, metadata);
+    }
+  }
+
+  public void reportStageStart(String stageName, Map<String, Long> metricMap) 
throws SemanticException {
+    if (isEnabled) {
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = new Stage(stageName, Status.IN_PROGRESS, 
System.currentTimeMillis());
+      for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
+        stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
+      }
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);
+      metricCollector.addMetric(replicationMetric);
+    }
+  }
+
+
+  public void reportStageEnd(String stageName, Status status, long lastReplId) 
throws SemanticException {
+    if (isEnabled) {
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = progress.getStageByName(stageName);
+      stage.setStatus(status);
+      stage.setEndTime(System.currentTimeMillis());
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);
+      Metadata metadata = replicationMetric.getMetadata();
+      metadata.setLastReplId(lastReplId);
+      replicationMetric.setMetadata(metadata);
+      metricCollector.addMetric(replicationMetric);
+    }
+  }
+
+  public void reportStageEnd(String stageName, Status status) throws 
SemanticException {
+    if (isEnabled) {
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = progress.getStageByName(stageName);
+      stage.setStatus(status);
+      stage.setEndTime(System.currentTimeMillis());
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);
+      metricCollector.addMetric(replicationMetric);
+    }
+  }
+
+  public void reportStageProgress(String stageName, String metricName, long 
count) throws SemanticException {
+    if (isEnabled) {
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = progress.getStageByName(stageName);
+      Metric metric = stage.getMetricByName(metricName);
+      metric.setCurrentCount(metric.getCurrentCount() + count);
+      if (metric.getCurrentCount() > metric.getTotalCount()) {
+        metric.setTotalCount(metric.getCurrentCount());
+      }
+      stage.addMetric(metric);
+      progress.addStage(stage);

Review comment:
       progress.addStage() is not required, progress already has it.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -518,27 +528,23 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData 
dmd, Path cmRoot, Hive
       dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb);
       Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf);
     }
-
     replLogger.endLog(lastReplId.toString());
-
     LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
-    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
-
+    long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);

Review comment:
       nit: name the variable as executionId

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+
+import java.util.Map;
+
+/**
+ * Abstract class for Replication Metric Collection.
+ */
+public abstract class ReplicationMetricCollector {
+  private ReplicationMetric replicationMetric;
+  private MetricCollector metricCollector;
+  private boolean isEnabled;
+
+  public ReplicationMetricCollector(String dbName, Metadata.ReplicationType 
replicationType,
+                             String stagingDir, long dumpExecutionId, HiveConf 
conf) {
+    String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME);
+    long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+    if (!StringUtils.isEmpty(policy) && executionId > 0) {

Review comment:
       Metric collection shouldn't be optional in these cases, it should throw 
exception if required parameter is not set. If we want to make it optional, 
then a config for enabling metric can be introduced and this being option 
feature should be controlled by only that config.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
##########
@@ -96,7 +102,9 @@ public IncrementalLoadTasksBuilder(String dbName, String 
loadPath,
     this.log = log;
     numIteration++;
     this.log.debug("Iteration num " + numIteration);
-
+    Map<String, Long> metricMap = new HashMap<>();
+    metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 
iterator.getNumEvents());

Review comment:
       During checkpointing, iterator.getNumEvents() will be total number of 
events, which will includ already replayed events also.
   If we set it as max number of events, it will never show that load completed 
with all events as the metric counter is updated only for events which are not 
replayed already. In that case final metric would show like 80/100 if 20 were 
replayed in last iteration and it failed to complete the last repl load run. 
This might confuse the customer. Or would they be fine to see it like 20/100 
with final status as FAILED from previous run and 80/100 with final status as 
SUCCESS in the current run?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
+
+import java.util.Map;
+
+/**
+ * Abstract class for Replication Metric Collection.
+ */
+public abstract class ReplicationMetricCollector {
+  private ReplicationMetric replicationMetric;
+  private MetricCollector metricCollector;
+  private boolean isEnabled;
+
+  public ReplicationMetricCollector(String dbName, Metadata.ReplicationType 
replicationType,
+                             String stagingDir, long dumpExecutionId, HiveConf 
conf) {
+    String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME);
+    long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+    if (!StringUtils.isEmpty(policy) && executionId > 0) {
+      isEnabled = true;
+      metricCollector = MetricCollector.getInstance().init(conf);
+      MetricSink.getInstance().init(conf);
+      Metadata metadata = new Metadata(dbName, replicationType, stagingDir);
+      replicationMetric = new ReplicationMetric(executionId, policy, 
dumpExecutionId, metadata);
+    }
+  }
+
+  public void reportStageStart(String stageName, Map<String, Long> metricMap) 
throws SemanticException {
+    if (isEnabled) {
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = new Stage(stageName, Status.IN_PROGRESS, 
System.currentTimeMillis());
+      for (Map.Entry<String, Long> metric : metricMap.entrySet()) {
+        stage.addMetric(new Metric(metric.getKey(), metric.getValue()));
+      }
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);

Review comment:
       You don't need to reset it (using setProgress()) as 
replicationMetric.getProgress() doesn't remove the reference.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.metric;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * MetricCollector.
+ * In memory collection of metrics
+ */
+public final class MetricCollector {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricCollector.class);
+  private Map<Long, ReplicationMetric> metricMap = new ConcurrentHashMap<>();
+  private long maxSize = (long) 
MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal();
+  private boolean isInited = false;
+  private static volatile MetricCollector instance;
+
+  private MetricCollector(){
+  }
+
+  public static MetricCollector getInstance() {
+    if (instance == null) {
+      synchronized (MetricCollector.class) {
+        if (instance == null) {
+          instance = new MetricCollector();
+        }
+      }
+    }
+    return instance;
+  }
+
+  public synchronized MetricCollector init(HiveConf conf) {
+    //Can initialize the cache only once with a value.
+    if (!isInited) {
+      maxSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE);

Review comment:
       This will be used in HS2 process, not in HMS, any reason why conf is in 
MetastoreConf and not in HiveConf?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
##########
@@ -180,6 +187,35 @@ private void initReplLogger() {
     }
   }
 
+  private long getNumFunctions(Path dbDumpPath, FileSystem fs) throws 
IOException {
+    Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+    if (fs.exists(funcPath)) {
+      return getSubDirs(fs, funcPath).length;
+    }
+    return 0;
+  }
+
+  private long getNumTables(Path dbDumpPath, FileSystem fs) throws IOException 
{
+    return getSubDirs(fs, dbDumpPath).length;
+  }
+
+  private void initMetricCollector() {
+    try {
+      Path dbDumpPath = currentDatabaseIterator.dbLevelPath();
+      FileSystem fs = dbDumpPath.getFileSystem(hiveConf);
+      long numTables = getNumTables(dbDumpPath, fs);
+      long numFunctions = getNumFunctions(dbDumpPath, fs);
+      Map<String, Long> metricMap = new HashMap<>();
+      metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) numTables);
+      metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 
numFunctions);
+      metricCollector.reportStageStart("REPL_LOAD", metricMap);
+    } catch (IOException e) {
+      // Ignore the exception

Review comment:
       Didn't get the reason to ignore IOException?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
##########
@@ -180,6 +187,35 @@ private void initReplLogger() {
     }
   }
 
+  private long getNumFunctions(Path dbDumpPath, FileSystem fs) throws 
IOException {
+    Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+    if (fs.exists(funcPath)) {
+      return getSubDirs(fs, funcPath).length;
+    }
+    return 0;
+  }
+
+  private long getNumTables(Path dbDumpPath, FileSystem fs) throws IOException 
{
+    return getSubDirs(fs, dbDumpPath).length;
+  }
+
+  private void initMetricCollector() {
+    try {
+      Path dbDumpPath = currentDatabaseIterator.dbLevelPath();
+      FileSystem fs = dbDumpPath.getFileSystem(hiveConf);
+      long numTables = getNumTables(dbDumpPath, fs);
+      long numFunctions = getNumFunctions(dbDumpPath, fs);
+      Map<String, Long> metricMap = new HashMap<>();
+      metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) numTables);
+      metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 
numFunctions);

Review comment:
       No need to typecast, they already are long

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -118,7 +123,7 @@ public TaskTracker tasks() throws Exception {
         if (!forNewTable().hasReplicationState()) {
           // Add ReplStateLogTask only if no pending table load tasks left for 
next cycle
           Task<?> replLogTask
-                  = ReplUtils.getTableReplLogTask(tableDesc, replLogger, 
context.hiveConf);
+                  = ReplUtils.getTableReplLogTask(tableDesc, replLogger, 
context.hiveConf, metricCollector);

Review comment:
       nit: Can accommodate in single line.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -52,27 +55,47 @@
   private static final long serialVersionUID = 1L;
   private static final transient Logger LOG = 
LoggerFactory.getLogger(AtlasLoadTask.class);
 
+  public AtlasLoadTask() {
+    super();
+  }
+
+  @VisibleForTesting
+  AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work) {
+    this.conf = conf;
+    this.work = work;
+  }
+
   @Override
   public int execute() {
     try {
       AtlasReplInfo atlasReplInfo  = createAtlasReplInfo();
+      Map<String, Long> metricMap = new HashMap<>();
+      metricMap.put(ReplUtils.MetricName.TAGS.name(), 0L);

Review comment:
       TAG: Shouldn't it be entities?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -120,10 +121,20 @@ public int execute() {
       }
     } catch (RuntimeException e) {
       LOG.error("replication failed with run time exception", e);
+      try {
+        work.getMetricCollector().reportEnd(Status.FAILED);
+      } catch (SemanticException ex) {
+        LOG.error("Failed to collect Metrics ", ex);
+      }
       throw e;
     } catch (Exception e) {
       LOG.error("replication failed", e);
       setException(e);
+      try {
+        work.getMetricCollector().reportEnd(Status.FAILED);
+      } catch (SemanticException ex) {
+        LOG.error("Failed to collect Metrics ", ex);
+      }

Review comment:
       nit: put it in a method 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -499,10 +505,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData 
dmd, Path cmRoot, Hive
     String dbName = (null != work.dbNameOrPattern && 
!work.dbNameOrPattern.isEmpty())
         ? work.dbNameOrPattern
         : "?";
-    replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
-            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, 
work.eventTo, maxEventLimit),
+    long estimatedNumEvents = 
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,

Review comment:
       Shouldn't resumeFrom be used here? During a reuse of same dump directory 
because of checkpointing, estimatedNumEvents will have wrong value.

##########
File path: 
itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
##########
@@ -1377,6 +1378,11 @@ public void 
scheduledQueryProgress(ScheduledQueryProgressInfo info)
     throw new RuntimeException("unimplemented");
   }
 
+  @Override
+  public void addReplicationMetric(ReplicationMetricList 
replicationMetricList) {
+    throw new RuntimeException("unimplemented");

Review comment:
       Wouldn't an UnsupportedOperationException be better here?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
##########
@@ -151,7 +154,7 @@ public TaskTracker tasks(boolean isBootstrapDuringInc) 
throws Exception {
     );
     if (!isPartitioned(tableDesc)) {
       Task<?> replLogTask
-              = ReplUtils.getTableReplLogTask(tableDesc, replLogger, 
context.hiveConf);
+              = ReplUtils.getTableReplLogTask(tableDesc, replLogger, 
context.hiveConf, metricCollector);

Review comment:
       nit : Can accommodate in single line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 442455)
    Time Spent: 6h 10m  (was: 6h)

> Store hive replication policy execution metrics in the relational DB
> --------------------------------------------------------------------
>
>                 Key: HIVE-23516
>                 URL: https://issues.apache.org/jira/browse/HIVE-23516
>             Project: Hive
>          Issue Type: Task
>            Reporter: Aasha Medhi
>            Assignee: Aasha Medhi
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23516.01.patch, HIVE-23516.02.patch, 
> HIVE-23516.03.patch, HIVE-23516.04.patch, HIVE-23516.05.patch, 
> HIVE-23516.06.patch, HIVE-23516.07.patch, HIVE-23516.08.patch, 
> HIVE-23516.09.patch, HIVE-23516.10.patch, HIVE-23516.11.patch, 
> HIVE-23516.12.patch, HIVE-23516.13.patch, HIVE-23516.14.patch, 
> HIVE-23516.15.patch, HIVE-23516.16.patch, HIVE-23516.17.patch, 
> HIVE-23516.18.patch, Replication Metrics.pdf
>
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Details documented in the attached doc



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to