This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a9b5a8acb [INLONG-6898][Sort] Unify metrics report model of SortStandalone (#6947) a9b5a8acb is described below commit a9b5a8acb455a028e0a186d1a786fa47b4dd73d4 Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Mon Dec 19 16:35:06 2022 +0800 [INLONG-6898][Sort] Unify metrics report model of SortStandalone (#6947) --- .../sort/standalone/metrics/MetricItemValue.java | 73 ---------- .../sort/standalone/metrics/MetricListener.java | 39 ----- .../standalone/metrics/MetricListenerRunnable.java | 142 ------------------ .../sort/standalone/metrics/MetricObserver.java | 114 --------------- .../metrics/TestMetricListenerRunnable.java | 161 --------------------- .../sort/standalone/SortStandaloneApplication.java | 2 +- .../prometheus/PrometheusMetricListener.java | 5 +- 7 files changed, 4 insertions(+), 532 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java deleted file mode 100644 index 25be6d8e3..000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.metrics; - -import java.util.Map; - -import org.apache.inlong.common.metric.MetricValue; - -/** - * - * MetricItemValue - */ -public class MetricItemValue { - - private final String key; - private final Map<String, String> dimensions; - private final Map<String, MetricValue> metrics; - - /** - * Constructor - * - * @param key - * @param dimensions - * @param metrics - */ - public MetricItemValue(String key, Map<String, String> dimensions, Map<String, MetricValue> metrics) { - this.key = key; - this.dimensions = dimensions; - this.metrics = metrics; - } - - /** - * get key - * - * @return the key - */ - public String getKey() { - return key; - } - - /** - * get dimensions - * - * @return the dimensions - */ - public Map<String, String> getDimensions() { - return dimensions; - } - - /** - * get metrics - * - * @return the metrics - */ - public Map<String, MetricValue> getMetrics() { - return metrics; - } -} diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java deleted file mode 100644 index 7475d833b..000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.metrics; - -import java.util.List; - -/** - * - * MetricListener - */ -public interface MetricListener { - - String KEY_METRIC_DOMAINS = "metricDomains"; - String KEY_DOMAIN_LISTENERS = "domainListeners"; - String KEY_SNAPSHOT_INTERVAL = "snapshotInterval"; - - /** - * snapshot - * - * @param domain - * @param itemValues - */ - public void snapshot(String domain, List<MetricItemValue> itemValues); -} diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java deleted file mode 100644 index 5708a9092..000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.metrics; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.commons.lang3.ClassUtils; -import org.apache.inlong.common.metric.MetricItem; -import org.apache.inlong.common.metric.MetricItemMBean; -import org.apache.inlong.common.metric.MetricItemSetMBean; -import org.apache.inlong.common.metric.MetricRegister; -import org.apache.inlong.common.metric.MetricUtils; -import org.apache.inlong.common.metric.MetricValue; -import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import org.slf4j.Logger; - -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.management.AttributeNotFoundException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.ReflectionException; - -/** - * - * MetricListenerRunnable - */ -public class MetricListenerRunnable implements Runnable { - - public static final Logger LOG = InlongLoggerFactory.getLogger(MetricListenerRunnable.class); - - private String domain; - private List<MetricListener> listenerList; - - /** - * Constructor - * - * @param domain - * @param listenerList - */ - public MetricListenerRunnable(String domain, List<MetricListener> listenerList) { - this.domain = domain; - this.listenerList = listenerList; - } - - /** - * run - */ - @Override - public void run() { - LOG.info("begin to snapshot metric:{}", domain); - try { - List<MetricItemValue> itemValues = this.getItemValues(); - LOG.info("snapshot metric:{},size:{},content:{}", domain, itemValues.size(), - new ObjectMapper().writeValueAsString(itemValues)); - this.listenerList.forEach((item) -> { - item.snapshot(domain, itemValues); - }); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - } - LOG.info("end to snapshot metric:{}", domain); - } - - /** - * getItemValues - * - * @return MetricItemValue List - * @throws InstanceNotFoundException - * @throws AttributeNotFoundException - * @throws ReflectionException - * @throws MBeanException - * @throws MalformedObjectNameException - * @throws ClassNotFoundException - */ - @SuppressWarnings("unchecked") - public List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException, - ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException { - StringBuilder beanName = new StringBuilder(); - beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR) - .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class)) - .append(MetricItemMBean.PROPERTY_SEPARATOR) - .append("*"); - ObjectName objName = new ObjectName(beanName.toString()); - final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null); - LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans); - List<MetricItemValue> itemValues = new ArrayList<>(); - for (ObjectInstance mbean : mbeans) { - String className = mbean.getClassName(); - Class<?> clazz = ClassUtils.getClass(className); - if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) { - ObjectName metricObjectName = mbean.getObjectName(); - String dimensionsKey = (String) mbs.getAttribute(metricObjectName, - MetricItemMBean.ATTRIBUTE_KEY); - Map<String, String> dimensions = (Map<String, String>) mbs - .getAttribute(metricObjectName, MetricItemMBean.ATTRIBUTE_DIMENSIONS); - Map<String, MetricValue> metrics = (Map<String, MetricValue>) mbs - .invoke(metricObjectName, MetricItemMBean.METHOD_SNAPSHOT, null, null); - MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics); - LOG.info("MetricItemMBean get itemValue:{}", itemValue); - itemValues.add(itemValue); - } else if (ClassUtils.isAssignable(clazz, MetricItemSetMBean.class)) { - ObjectName metricObjectName = mbean.getObjectName(); - List<MetricItem> items = (List<MetricItem>) mbs.invoke(metricObjectName, - MetricItemMBean.METHOD_SNAPSHOT, null, null); - for (MetricItem item : items) { - String dimensionsKey = item.getDimensionsKey(); - Map<String, String> dimensions = item.getDimensions(); - Map<String, MetricValue> metrics = item.snapshot(); - MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics); - LOG.info("MetricItemSetMBean get itemValue:{}", itemValue); - itemValues.add(itemValue); - } - } - } - return itemValues; - } -} diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java deleted file mode 100644 index edc438be1..000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.metrics; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang3.ClassUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.flume.Context; -import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import org.slf4j.Logger; - -/** - * - * MetricObserver - */ -public class MetricObserver { - - public static final Logger LOG = InlongLoggerFactory.getLogger(MetricObserver.class); - private static final AtomicBoolean isInited = new AtomicBoolean(false); - private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5); - - /** - * init - * - * @param commonProperties - */ - public static void init(Map<String, String> commonProperties) { - if (!isInited.compareAndSet(false, true)) { - return; - } - // init - Context context = new Context(commonProperties); - // get domain name list - String metricDomains = context.getString(MetricListener.KEY_METRIC_DOMAINS); - if (StringUtils.isBlank(metricDomains)) { - return; - } - // split domain name - String[] domains = metricDomains.split("\\s+"); - for (String domain : domains) { - // get domain parameters - Context domainContext = new Context( - context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS + "." + domain + ".")); - List<MetricListener> listenerList = parseDomain(domain, domainContext); - // no listener - if (listenerList == null || listenerList.size() <= 0) { - continue; - } - // get snapshot interval - long snapshotInterval = domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L); - LOG.info("begin to register domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList, - snapshotInterval); - statExecutor.scheduleWithFixedDelay(new MetricListenerRunnable(domain, listenerList), snapshotInterval, - snapshotInterval, TimeUnit.MILLISECONDS); - } - - } - - /** - * parseDomain - * - * @param domain - * @param context - * @return - */ - private static List<MetricListener> parseDomain(String domain, Context domainContext) { - String listeners = domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS); - if (StringUtils.isBlank(listeners)) { - return null; - } - String[] listenerTypes = listeners.split("\\s+"); - List<MetricListener> listenerList = new ArrayList<>(); - for (String listenerType : listenerTypes) { - // new listener object - try { - Class<?> listenerClass = ClassUtils.getClass(listenerType); - Object listenerObject = listenerClass.getDeclaredConstructor().newInstance(); - if (listenerObject == null || !(listenerObject instanceof MetricListener)) { - LOG.error("{} is not instance of MetricListener.", listenerType); - continue; - } - final MetricListener listener = (MetricListener) listenerObject; - listenerList.add(listener); - } catch (Throwable t) { - LOG.error("Fail to init MetricListener:{},error:{}", - listenerType, t.getMessage()); - continue; - } - } - return listenerList; - } -} diff --git a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java deleted file mode 100644 index 8ecc39011..000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.metrics; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.inlong.common.metric.MetricRegister; -import org.apache.inlong.common.metric.MetricUtils; -import org.apache.inlong.common.metric.MetricValue; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * - * TestMetricItemSetMBean - */ -public class TestMetricListenerRunnable { - - public static final String CLUSTER_ID = "inlong5th_sz"; - public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001"; - public static final String CONTAINER_IP = "127.0.0.1"; - private static final String SOURCE_ID = "agent-source"; - private static final String SOURCE_DATA_ID = "12069"; - private static final String INLONG_GROUP_ID1 = "03a00000026"; - private static final String INLONG_GROUP_ID2 = "03a00000126"; - private static final String INLONG_STREAM_ID = ""; - private static final String SINK_ID = "inlong5th-pulsar-sz"; - private static final String SINK_DATA_ID = "PULSAR_TOPIC_1"; - private static SortMetricItemSet itemSet; - private static Map<String, String> dimSource; - private static Map<String, String> dimSink; - private static String keySource1; - private static String keySource2; - private static String keySink1; - private static String keySink2; - - /** - * setup - */ - @BeforeClass - public static void setup() { - itemSet = new SortMetricItemSet(CLUSTER_ID); - MetricRegister.register(itemSet); - // prepare - SortMetricItem itemSource = new SortMetricItem(); - itemSource.clusterId = CLUSTER_ID; - itemSource.sourceId = SOURCE_ID; - itemSource.sourceDataId = SOURCE_DATA_ID; - itemSource.inlongGroupId = INLONG_GROUP_ID1; - itemSource.inlongStreamId = INLONG_STREAM_ID; - dimSource = itemSource.getDimensions(); - // - SortMetricItem itemSink = new SortMetricItem(); - itemSink.clusterId = CLUSTER_ID; - itemSink.sinkId = SINK_ID; - itemSink.sinkDataId = SINK_DATA_ID; - itemSink.inlongGroupId = INLONG_GROUP_ID1; - itemSink.inlongStreamId = INLONG_STREAM_ID; - dimSink = itemSink.getDimensions(); - } - - /** - * setdown - */ - @AfterClass - public static void setdown() { - MetricRegister.unregister(itemSet); - } - - /** - * testResult - * - * @throws Exception - */ - @Test - public void testResult() throws Exception { - // increase source - SortMetricItem item = null; - item = itemSet.findMetricItem(dimSource); - item.readSuccessCount.incrementAndGet(); - item.readSuccessSize.addAndGet(100); - keySource1 = MetricUtils.getDimensionsKey(dimSource); - // - dimSource.put("inlongGroupId", INLONG_GROUP_ID2); - item = itemSet.findMetricItem(dimSource); - item.readFailCount.addAndGet(20); - item.readFailSize.addAndGet(2000); - keySource2 = MetricUtils.getDimensionsKey(dimSource); - // increase sink - item = itemSet.findMetricItem(dimSink); - item.sendCount.incrementAndGet(); - item.sendSize.addAndGet(100); - item.sendSuccessCount.incrementAndGet(); - item.sendSuccessSize.addAndGet(100); - keySink1 = MetricUtils.getDimensionsKey(dimSink); - // - dimSink.put("inlongGroupId", INLONG_GROUP_ID2); - item = itemSet.findMetricItem(dimSink); - item.sendCount.addAndGet(20); - item.sendSize.addAndGet(2000); - item.sendFailCount.addAndGet(20); - item.sendFailSize.addAndGet(2000); - keySink2 = MetricUtils.getDimensionsKey(dimSink); - // report - MetricListener listener = new MetricListener() { - - @Override - public void snapshot(String domain, List<MetricItemValue> itemValues) { - assertEquals("Sort", domain); - for (MetricItemValue itemValue : itemValues) { - String key = itemValue.getKey(); - Map<String, MetricValue> metricMap = itemValue.getMetrics(); - if (keySource1.equals(itemValue.getKey())) { - assertEquals(1, metricMap.get("readSuccessCount").value); - assertEquals(100, metricMap.get("readSuccessSize").value); - } else if (keySource2.equals(key)) { - assertEquals(20, metricMap.get("readFailCount").value); - assertEquals(2000, metricMap.get("readFailSize").value); - } else if (keySink1.equals(key)) { - assertEquals(1, metricMap.get("sendCount").value); - assertEquals(100, metricMap.get("sendSize").value); - assertEquals(1, metricMap.get("sendSuccessCount").value); - assertEquals(100, metricMap.get("sendSuccessSize").value); - } else if (keySink2.equals(key)) { - assertEquals(20, metricMap.get("sendCount").value); - assertEquals(2000, metricMap.get("sendSize").value); - assertEquals(20, metricMap.get("sendFailCount").value); - assertEquals(2000, metricMap.get("sendFailSize").value); - } else { - System.out.println("bad MetricItem:" + key); - } - } - } - }; - List<MetricListener> listeners = new ArrayList<>(); - listeners.add(listener); - MetricListenerRunnable runnable = new MetricListenerRunnable("Sort", listeners); - List<MetricItemValue> itemValues = runnable.getItemValues(); - runnable.run(); - } -} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java index 4faac77a9..362a192a3 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java @@ -18,8 +18,8 @@ package org.apache.inlong.sort.standalone; import org.apache.flume.node.Application; +import org.apache.inlong.common.metric.MetricObserver; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; -import org.apache.inlong.sort.standalone.metrics.MetricObserver; import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import org.slf4j.Logger; diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java index 887a6ea74..b760df206 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java @@ -47,10 +47,11 @@ import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.inlong.common.metric.MetricItemValue; +import org.apache.inlong.common.metric.MetricListener; import org.apache.inlong.common.metric.MetricValue; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; -import org.apache.inlong.sort.standalone.metrics.MetricItemValue; -import org.apache.inlong.sort.standalone.metrics.MetricListener; + import org.apache.inlong.sort.standalone.metrics.SortMetricItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory;