This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a70707f KYLIN-4508 Add unit test for core-metrics module & reporters
a70707f is described below
commit a70707f3b77d005aa54a5779c45eeb1ae6191070
Author: Zhong, Yanghong <[email protected]>
AuthorDate: Fri Jun 19 13:41:09 2020 +0800
KYLIN-4508 Add unit test for core-metrics module & reporters
---
.../kylin/metrics/lib/impl/TimedRecordEvent.java | 32 ++++
metrics-reporter-hive/pom.xml | 25 ++++
.../kylin/metrics/lib/impl/hive/HiveProducer.java | 7 +-
.../metrics/lib/impl/hive/HiveProducerRecord.java | 141 +++++++-----------
.../lib/impl/hive/HiveReservoirReporter.java | 39 +++--
.../tool/metrics/systemcube/HiveTableCreator.java | 3 +-
.../lib/impl/hive/HiveProducerRecordTest.java | 81 +++++++++++
.../metrics/lib/impl/hive/HiveProducerTest.java | 161 +++++++++++++++++++++
.../lib/impl/hive/HiveReservoirReporterTest.java | 88 +++++++++++
metrics-reporter-kafka/pom.xml | 19 +++
.../impl/kafka/KafkaActiveReserviorListener.java | 8 +
.../lib/impl/kafka/KafkaReservoirReporter.java | 6 +-
.../lib/impl/kafka/KafkaReservoirReporterTest.java | 79 ++++++++++
13 files changed, 585 insertions(+), 104 deletions(-)
diff --git
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
index a866163..984d5f5 100644
---
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
+++
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
@@ -44,4 +44,36 @@ public class TimedRecordEvent extends RecordEvent {
super.resetTime();
addTimeDetails();
}
+
+ public String getYear() {
+ return (String) get(TimePropertyEnum.YEAR.toString());
+ }
+
+ public String getMonth() {
+ return (String) get(TimePropertyEnum.MONTH.toString());
+ }
+
+ public String getWeekBeginDate() {
+ return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString());
+ }
+
+ public String getDayDate() {
+ return (String) get(TimePropertyEnum.DAY_DATE.toString());
+ }
+
+ public String getDayTime() {
+ return (String) get(TimePropertyEnum.DAY_TIME.toString());
+ }
+
+ public int getTimeHour() {
+ return (int) get(TimePropertyEnum.TIME_HOUR.toString());
+ }
+
+ public int getTimeMinute() {
+ return (int) get(TimePropertyEnum.TIME_MINUTE.toString());
+ }
+
+ public int getTimeSecond() {
+ return (int) get(TimePropertyEnum.TIME_SECOND.toString());
+ }
}
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
index 0159804..87b7808 100644
--- a/metrics-reporter-hive/pom.xml
+++ b/metrics-reporter-hive/pom.xml
@@ -56,5 +56,30 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
+
+ <!-- Env & Test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4-rule-agent</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 8bc7a43..e79010c 100644
---
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -309,7 +309,7 @@ public class HiveProducer {
fout = null;
}
- private HiveProducerRecord convertTo(Record record) throws Exception {
+ HiveProducerRecord convertTo(Record record) throws Exception {
Map<String, Object> rawValue = record.getValueRaw();
//Set partition values for hive table
@@ -330,7 +330,8 @@ public class HiveProducer {
columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
}
- return new HiveProducerRecord(tableNameSplits.getFirst(),
tableNameSplits.getSecond(), partitionKVs,
- columnValues);
+ HiveProducerRecord.RecordKey key = new
HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond())
+
.setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build();
+ return new HiveProducerRecord(key, columnValues);
}
}
diff --git
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
index 650d18a..fa5222f 100644
---
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
+++
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -30,23 +30,8 @@ public class HiveProducerRecord {
private final RecordKey key;
private final List<Object> value;
- public HiveProducerRecord(String dbName, String tableName, Map<String,
String> partitionKVs, List<Object> value) {
- this.key = new RecordKey(dbName, tableName, partitionKVs);
- this.value = value;
- }
-
- public HiveProducerRecord(String tableName, Map<String, String>
partitionKVs, List<Object> value) {
- this.key = new RecordKey(tableName, partitionKVs);
- this.value = value;
- }
-
- public HiveProducerRecord(String dbName, String tableName, List<Object>
value) {
- this.key = new RecordKey(dbName, tableName);
- this.value = value;
- }
-
- public HiveProducerRecord(String tableName, List<Object> value) {
- this.key = new RecordKey(tableName);
+ public HiveProducerRecord(RecordKey key, List<Object> value) {
+ this.key = key;
this.value = value;
}
@@ -75,41 +60,55 @@ public class HiveProducerRecord {
return sb.toString();
}
+ @Override
public boolean equals(Object o) {
- if (this == o) {
+ if (this == o)
return true;
- } else if (!(o instanceof HiveProducerRecord)) {
+ if (o == null || getClass() != o.getClass())
return false;
- } else {
- HiveProducerRecord that = (HiveProducerRecord) o;
- if (this.key != null) {
- if (!this.key.equals(that.key)) {
- return false;
- }
- } else if (that.key != null) {
- return false;
- }
- if (this.value != null) {
- if (!this.value.equals(that.value)) {
- return false;
- }
- } else if (that.value != null) {
- return false;
- }
- }
- return true;
+
+ HiveProducerRecord record = (HiveProducerRecord) o;
+
+ if (key != null ? !key.equals(record.key) : record.key != null)
+ return false;
+ return value != null ? value.equals(record.value) : record.value ==
null;
}
+ @Override
public int hashCode() {
- int result = this.key != null ? this.key.hashCode() : 0;
- result = 31 * result + (this.value != null ? this.value.hashCode() :
0);
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
+ public static class KeyBuilder {
+ private final String tableName;
+ private String dbName;
+ private Map<String, String> partitionKVs;
+
+ public KeyBuilder(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public KeyBuilder setDbName(String dbName) {
+ this.dbName = dbName;
+ return this;
+ }
+
+ public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) {
+ this.partitionKVs = partitionKVs;
+ return this;
+ }
+
+ public RecordKey build() {
+ return new RecordKey(dbName, tableName, partitionKVs);
+ }
+ }
+
/**
* Use to organize metrics message
*/
- public class RecordKey {
+ public static class RecordKey {
public static final String DEFAULT_DB_NAME = "DEFAULT";
private final String dbName;
@@ -126,18 +125,6 @@ public class HiveProducerRecord {
this.partitionKVs = partitionKVs;
}
- public RecordKey(String tableName, Map<String, String> partitionKVs) {
- this(null, tableName, partitionKVs);
- }
-
- public RecordKey(String dbName, String tableName) {
- this(dbName, tableName, null);
- }
-
- public RecordKey(String tableName) {
- this(null, tableName, null);
- }
-
public String database() {
return this.dbName;
}
@@ -152,47 +139,31 @@ public class HiveProducerRecord {
public String toString() {
String partitionKVs = this.partitionKVs == null ? "null" :
this.partitionKVs.toString();
- return "RecordKey(database=" + this.dbName + ", table=" +
this.tableName + ", partition=" + partitionKVs + ")";
+ return "RecordKey(database=" + this.dbName + ", table=" +
this.tableName + ", partition=" + partitionKVs
+ + ")";
}
+ @Override
public boolean equals(Object o) {
- if (this == o) {
+ if (this == o)
return true;
- } else if (!(o instanceof RecordKey)) {
+ if (o == null || getClass() != o.getClass())
return false;
- } else {
- RecordKey that = (RecordKey) o;
- if (this.dbName != null) {
- if (!this.dbName.equals(that.dbName)) {
- return false;
- }
- } else if (that.dbName != null) {
- return false;
- }
-
- if (this.tableName != null) {
- if (!this.tableName.equals(that.tableName)) {
- return false;
- }
- } else if (that.tableName != null) {
- return false;
- }
-
- if (this.partitionKVs != null) {
- if (!this.partitionKVs.equals(that.partitionKVs)) {
- return false;
- }
- } else if (that.partitionKVs != null) {
- return false;
- }
- }
- return true;
+
+ RecordKey recordKey = (RecordKey) o;
+
+ if (dbName != null ? !dbName.equals(recordKey.dbName) :
recordKey.dbName != null)
+ return false;
+ if (tableName != null ? !tableName.equals(recordKey.tableName) :
recordKey.tableName != null)
+ return false;
+ return partitionKVs != null ?
partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null;
}
+ @Override
public int hashCode() {
- int result = this.dbName != null ? this.dbName.hashCode() : 0;
- result = 31 * result + (this.tableName != null ?
this.tableName.hashCode() : 0);
- result = 31 * result + (this.partitionKVs != null ?
this.partitionKVs.hashCode() : 0);
+ int result = dbName != null ? dbName.hashCode() : 0;
+ result = 31 * result + (tableName != null ? tableName.hashCode() :
0);
+ result = 31 * result + (partitionKVs != null ?
partitionKVs.hashCode() : 0);
return result;
}
}
diff --git
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
index 9d93e99..d1e252f 100644
---
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
+++
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -84,6 +84,10 @@ public class HiveReservoirReporter extends
ActiveReservoirReporter {
stop();
}
+ HiveReservoirListener getListener() {
+ return listener;
+ }
+
/**
* A builder for {@link HiveReservoirReporter} instances.
*/
@@ -107,15 +111,19 @@ public class HiveReservoirReporter extends
ActiveReservoirReporter {
}
}
- private class HiveReservoirListener implements ActiveReservoirListener {
+ class HiveReservoirListener implements ActiveReservoirListener {
private Properties props;
private Map<String, HiveProducer> producerMap = new HashMap<>();
+ private long nRecord = 0;
+ private long nRecordSkip = 0;
+ private long nUpdate = 0;
+
private HiveReservoirListener(Properties props) throws Exception {
this.props = props;
}
- private synchronized HiveProducer getProducer(String metricType)
throws Exception {
+ synchronized HiveProducer getProducer(String metricType) throws
Exception {
HiveProducer producer = producerMap.get(metricType);
if (producer == null) {
producer = new HiveProducer(metricType, props);
@@ -129,6 +137,7 @@ public class HiveReservoirReporter extends
ActiveReservoirReporter {
return true;
}
logger.info("Try to write {} records", records.size());
+ long prevNRecord = nRecord;
try {
Map<String, List<Record>> queues = new HashMap<>();
for (Record record : records) {
@@ -142,21 +151,17 @@ public class HiveReservoirReporter extends
ActiveReservoirReporter {
for (Map.Entry<String, List<Record>> entry :
queues.entrySet()) {
HiveProducer producer = getProducer(entry.getKey());
producer.send(entry.getValue());
+ nRecord += entry.getValue().size();
}
queues.clear();
+ if (nUpdate++ % 100 == 0) {
+ logger.info("Has done the update {} times with {} records
reported, {} records skipped", nUpdate,
+ nRecord, nRecordSkip);
+ }
} catch (Exception e) {
+ nRecordSkip += records.size() - (nRecord - prevNRecord);
logger.error(e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- public boolean onRecordUpdate(final Record record) {
- try {
- HiveProducer producer = getProducer(record.getSubject());
- producer.send(record);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.warn("Has skipped reporting {} records", nRecordSkip);
return false;
}
return true;
@@ -168,5 +173,13 @@ public class HiveReservoirReporter extends
ActiveReservoirReporter {
}
producerMap.clear();
}
+
+ public long getNRecord() {
+ return nRecord;
+ }
+
+ public long getNRecordSkip() {
+ return nRecordSkip;
+ }
}
}
diff --git
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
similarity index 99%
rename from
tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
rename to
metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 35d9efb..2f9eb1d 100644
---
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++
b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -19,8 +19,8 @@
package org.apache.kylin.tool.metrics.systemcube;
import java.util.List;
-
import java.util.Locale;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
@@ -32,7 +32,6 @@ import org.apache.kylin.metrics.property.JobPropertyEnum;
import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
import org.apache.kylin.metrics.property.QueryPropertyEnum;
import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
diff --git
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
new file mode 100644
index 0000000..ead74ad
--- /dev/null
+++
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static
org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER;
+import static
org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class HiveProducerRecordTest {
+
+ @Test
+ public void testRecord() {
+ String dbName = "KYLIN";
+ String tableName = "test";
+ Map<String, String> partitionKVs = Maps.newHashMap();
+ partitionKVs.put("key1", "value1");
+
+ Set<RecordKey> keySet = Sets.newHashSet();
+ RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build();
+ RecordKey key11 = new
HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build();
+ keySet.add(key1);
+ keySet.add(key11);
+ assertEquals(1, keySet.size());
+
+ RecordKey key2 = new
HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build();
+ RecordKey key3 = new
HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs)
+ .build();
+ keySet.add(key2);
+ keySet.add(key3);
+ assertEquals(3, keySet.size());
+ assertEquals(dbName, key2.database());
+ assertEquals(tableName, key2.table());
+
+ List<Object> value1 = Lists.<Object> newArrayList(1);
+ List<Object> value2 = Lists.<Object> newArrayList(1, "1");
+
+ assertNull(new HiveProducerRecord(key1, null).valueToString());
+
+ Set<HiveProducerRecord> recordSet = Sets.newHashSet();
+ HiveProducerRecord record1 = new HiveProducerRecord(key1, value1);
+ HiveProducerRecord record11 = new HiveProducerRecord(key11, value1);
+ recordSet.add(record1);
+ recordSet.add(record11);
+ assertEquals(1, recordSet.size());
+ assertEquals(key1, record1.key());
+ assertEquals(value1, record1.value());
+
+ recordSet.add(new HiveProducerRecord(key1, value2));
+ recordSet.add(new HiveProducerRecord(key2, value1));
+ assertEquals(3, recordSet.size());
+ assertEquals(1, record1.valueToString().split(DELIMITER).length);
+ }
+}
diff --git
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
new file mode 100644
index 0000000..2adc34f
--- /dev/null
+++
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.source.hive.HiveMetaStoreClientFactory;
+import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem",
+ "org.apache.kylin.source.hive.HiveMetaStoreClientFactory",
+ "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" })
+public class HiveProducerTest {
+
+ @Rule
+ public PowerMockRule rule = new PowerMockRule();
+
+ private HiveProducer hiveProducer;
+ private HiveMetaStoreClient metaStoreClient;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(KylinConfig.KYLIN_CONF,
"../examples/test_case_data/localmeta");
+
+ FileSystem hdfs = PowerMockito.mock(FileSystem.class);
+ URI uri = PowerMockito.mock(URI.class);
+ PowerMockito.stub(PowerMockito.method(FileSystem.class, "get",
Configuration.class)).toReturn(hdfs);
+ PowerMockito.when(hdfs.getUri()).thenReturn(uri);
+ PowerMockito.when(uri.toString()).thenReturn("hdfs");
+
+ HiveConf hiveConf = PowerMockito.mock(HiveConf.class);
+ String metricsType = new HiveSink()
+
.getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
+
+ hiveProducer = new HiveProducer(metricsType, new Properties(),
hiveConf);
+
+ metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class);
+
PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient);
+ PowerMockito
+ .stub(PowerMockito.method(HiveMetaStoreClientFactory.class,
"getHiveMetaStoreClient", HiveConf.class))
+ .toReturn(metaStoreClient);
+ }
+
+ @After
+ public void after() throws Exception {
+ System.clearProperty(KylinConfig.KYLIN_CONF);
+ }
+
+ @Test
+ public void testProduce() throws Exception {
+ TimedRecordEvent rpcEvent = generateTestRPCRecord();
+
+ Map<String, String> partitionKVs = Maps.newHashMap();
+ partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(),
rpcEvent.getDayDate());
+
+ List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default",
"test_cube", "sandbox", "NULL", 80L, 3L,
+ 3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(),
rpcEvent.getMonth(),
+ rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(),
rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(),
+ rpcEvent.getTimeSecond(), rpcEvent.getDayDate());
+
+ HiveProducerRecord.RecordKey key = new
HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test")
+ .setDbName("KYLIN").setPartitionKVs(partitionKVs).build();
+ HiveProducerRecord target = new HiveProducerRecord(key, value);
+
+ prepareMockForEvent(rpcEvent);
+ assertEquals(target, hiveProducer.convertTo(rpcEvent));
+ }
+
+ private void prepareMockForEvent(RecordEvent event) throws Exception {
+ String tableFullName = new
HiveSink().getTableFromSubject(event.getEventType());
+ Pair<String, String> tableNameSplits =
ActiveReservoirReporter.getTableNameSplits(tableFullName);
+ String dbName = tableNameSplits.getFirst();
+ String tableName = tableNameSplits.getSecond();
+
+ Table table = PowerMockito.mock(Table.class);
+ PowerMockito.when(metaStoreClient, "getTable", dbName,
tableName).thenReturn(table);
+
+ StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class);
+ PowerMockito.when(table, "getSd").thenReturn(sd);
+ PowerMockito.when(sd, "getLocation").thenReturn(null);
+
+ List<Pair<String, String>> columns =
HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+ List<Pair<String, String>> partitions =
HiveTableCreator.getPartitionKVsForHiveTable();
+ columns.addAll(partitions);
+ List<FieldSchema> fields =
Lists.newArrayListWithExpectedSize(columns.size());
+ for (Pair<String, String> column : columns) {
+ fields.add(new FieldSchema(column.getFirst(), column.getSecond(),
null));
+ }
+ PowerMockito.when(metaStoreClient, "getFields", dbName,
tableName).thenReturn(fields);
+ }
+
+ private TimedRecordEvent generateTestRPCRecord() {
+ TimedRecordEvent rpcMetricsEvent = new
TimedRecordEvent("metrics_query_rpc_test");
+ setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox",
null);
+ setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L);
+ return rpcMetricsEvent;
+ }
+
+ private static void setRPCWrapper(RecordEvent metricsEvent, String
projectName, String realizationName,
+ String rpcServer, Throwable throwable) {
+ metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
+ metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(),
realizationName);
+ metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(),
rpcServer);
+ metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
+ throwable == null ? "NULL" : throwable.getClass().getName());
+ }
+
+ private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs,
long skipCount, long scanCount,
+ long returnCount, long aggrCount) {
+ metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(),
callTimeMs);
+ metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(),
skipCount); //Number of skips on region servers based on region meta or fuzzy
filter
+ metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(),
scanCount); //Count scanned by region server
+ metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(),
returnCount);//Count returned by region server
+ metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(),
scanCount - returnCount); //Count filtered & aggregated by coprocessor
+ metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(),
aggrCount); //Count aggregated by coprocessor
+ }
+}
diff --git
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
new file mode 100644
index 0000000..fbb656c
--- /dev/null
+++
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.InstantReservoir;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+import com.google.common.collect.Lists;
+
+@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class })
+public class HiveReservoirReporterTest {
+
+ @Rule
+ public PowerMockRule rule = new PowerMockRule();
+
+ private HiveReservoirReporter hiveReporter;
+ private ActiveReservoir reservoir;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(KylinConfig.KYLIN_CONF,
"../examples/test_case_data/localmeta");
+
+ HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class);
+
PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer);
+
+ reservoir = new InstantReservoir();
+ reservoir.start();
+ hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build();
+ }
+
+ @After
+ public void after() throws Exception {
+ System.clearProperty(KylinConfig.KYLIN_CONF);
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ String metricsType = "TEST";
+ Record record = new RecordEvent(metricsType);
+ reservoir.update(record);
+ assertEquals(0, hiveReporter.getListener().getNRecord());
+
+ hiveReporter.start();
+ reservoir.update(record);
+ reservoir.update(record);
+ assertEquals(2, hiveReporter.getListener().getNRecord());
+
+ hiveReporter.stop();
+ reservoir.update(record);
+ assertEquals(2, hiveReporter.getListener().getNRecord());
+
+ hiveReporter.start();
+ reservoir.update(record);
+ PowerMockito.doThrow(new
Exception()).when(hiveReporter.getListener().getProducer(metricsType))
+ .send(Lists.newArrayList(record));
+ reservoir.update(record);
+ assertEquals(3, hiveReporter.getListener().getNRecord());
+ assertEquals(1, hiveReporter.getListener().getNRecordSkip());
+ }
+}
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index 173febdc..30759d4 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -42,5 +42,24 @@
<artifactId>kafka_2.11</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4-rule-agent</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
index df79c57..b1a1bd1 100644
---
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
+++
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -115,4 +115,12 @@ public abstract class KafkaActiveReserviorListener
implements ActiveReservoirLis
logger.debug("Cannot find topic {}", topic);
topicsIfAvailable.put(topic, System.currentTimeMillis());
}
+
+ public long getNRecord() {
+ return nRecord;
+ }
+
+ public long getNRecordSkip() {
+ return nRecordSkip;
+ }
}
diff --git
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
index a7b58a6..97b839c 100644
---
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
+++
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -88,6 +88,10 @@ public class KafkaReservoirReporter extends
ActiveReservoirReporter {
stop();
}
+ KafkaReservoirListener getListener() {
+ return listener;
+ }
+
/**
* A builder for {@link KafkaReservoirReporter} instances.
*/
@@ -113,7 +117,7 @@ public class KafkaReservoirReporter extends
ActiveReservoirReporter {
}
}
- private class KafkaReservoirListener extends KafkaActiveReserviorListener {
+ class KafkaReservoirListener extends KafkaActiveReserviorListener {
protected final Producer<byte[], byte[]> producer;
private KafkaReservoirListener(Properties props) {
diff --git
a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
new file mode 100644
index 0000000..4a14e66
--- /dev/null
+++
b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.InstantReservoir;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class })
+public class KafkaReservoirReporterTest {
+
+ @Rule
+ public PowerMockRule rule = new PowerMockRule();
+
+ private KafkaReservoirReporter kafkaReporter;
+ private ActiveReservoir reservoir;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(KylinConfig.KYLIN_CONF,
"../examples/test_case_data/localmeta");
+
+ KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class);
+
PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer);
+
+ reservoir = new InstantReservoir();
+ reservoir.start();
+ kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build();
+ }
+
+ @After
+ public void after() throws Exception {
+ System.clearProperty(KylinConfig.KYLIN_CONF);
+ }
+
+ @Test
+ public void testUpdate() {
+ Record record = new RecordEvent("TEST");
+ reservoir.update(record);
+ assertEquals(0, kafkaReporter.getListener().getNRecord());
+
+ kafkaReporter.start();
+ reservoir.update(record);
+ reservoir.update(record);
+ assertEquals(2, kafkaReporter.getListener().getNRecord());
+
+ kafkaReporter.stop();
+ reservoir.update(record);
+ assertEquals(2, kafkaReporter.getListener().getNRecord());
+ assertEquals(0, kafkaReporter.getListener().getNRecordSkip());
+ }
+}