This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d003bd85b6 [Feature][Zeta] Added the metrics information of table 
statistics in multi-table mode (#7212)
d003bd85b6 is described below

commit d003bd85b65b307e77867781986cc7d58c3ba80a
Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com>
AuthorDate: Fri Jul 26 10:44:09 2024 +0800

    [Feature][Zeta] Added the metrics information of table statistics in 
multi-table mode (#7212)
---
 .../api/sink/multitablesink/MultiTableSink.java    |   5 +
 .../seatunnel/engine/e2e/MultiTableMetricsIT.java  | 125 +++++++++++++++++++++
 .../batch_fake_multi_table_to_console.conf         |  64 +++++++++++
 .../engine/client/SeaTunnelClientTest.java         | 114 +++++++++++++++++++
 .../batch_fake_multi_table_to_console.conf         |  66 +++++++++++
 .../server/rest/RestHttpGetCommandProcessor.java   |  74 +++++++++++-
 .../server/task/SeaTunnelSourceCollector.java      |  53 +++++++--
 .../engine/server/task/SourceSeaTunnelTask.java    |  13 ++-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  38 +++++++
 9 files changed, 537 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index bb04283ca6..923ecff8b8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
@@ -149,6 +150,10 @@ public class MultiTableSink
         return Optional.of(new 
MultiTableSinkAggregatedCommitter(aggCommitters));
     }
 
+    public List<TablePath> getSinkTables() {
+        return 
sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
+    }
+
     @Override
     public Optional<Serializer<MultiTableAggregatedCommitInfo>>
             getAggregatedCommitInfoSerializer() {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
new file mode 100644
index 0000000000..59942eb4cc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.equalTo;
+
+public class MultiTableMetricsIT {
+
+    private static final String HOST = "http://localhost:";;
+
+    private static ClientJobProxy batchJobProxy;
+
+    private static HazelcastInstanceImpl node1;
+
+    private static SeaTunnelClient engineClient;
+
+    @BeforeEach
+    void beforeClass() throws Exception {
+        String testClusterName = TestUtils.getClusterName("RestApiIT");
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+        node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(testClusterName);
+        engineClient = new SeaTunnelClient(clientConfig);
+
+        String batchFilePath = 
TestUtils.getResource("batch_fake_multi_table_to_console.conf");
+        JobConfig batchConf = new JobConfig();
+        batchConf.setName("batch_fake_multi_table_to_console");
+        ClientJobExecutionEnvironment batchJobExecutionEnv =
+                engineClient.createExecutionContext(batchFilePath, batchConf, 
seaTunnelConfig);
+        batchJobProxy = batchJobExecutionEnv.execute();
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED, 
batchJobProxy.getJobStatus()));
+    }
+
+    @Test
+    public void multiTableMetrics() {
+        Collections.singletonList(node1)
+                .forEach(
+                        instance -> {
+                            given().get(
+                                            HOST
+                                                    + instance.getCluster()
+                                                            .getLocalMember()
+                                                            .getAddress()
+                                                            .getPort()
+                                                    + RestConstant.JOB_INFO_URL
+                                                    + "/"
+                                                    + batchJobProxy.getJobId())
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobName", 
equalTo("batch_fake_multi_table_to_console"))
+                                    .body("jobStatus", equalTo("FINISHED"))
+                                    .body("metrics.SourceReceivedCount", 
equalTo("50"))
+                                    .body("metrics.SinkWriteCount", 
equalTo("50"))
+                                    .body(
+                                            
"metrics.TableSourceReceivedCount.'fake.table1'",
+                                            equalTo("20"))
+                                    .body(
+                                            
"metrics.TableSourceReceivedCount.'fake.public.table2'",
+                                            equalTo("30"))
+                                    .body(
+                                            
"metrics.TableSinkWriteCount.'fake.table1'",
+                                            equalTo("20"))
+                                    .body(
+                                            
"metrics.TableSinkWriteCount.'fake.public.table2'",
+                                            equalTo("30"));
+                        });
+    }
+
+    @AfterEach
+    void afterClass() {
+        if (engineClient != null) {
+            engineClient.close();
+        }
+
+        if (node1 != null) {
+            node1.shutdown();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
new file mode 100644
index 0000000000..c51929a0ed
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake1"
+    row.num = 20
+    schema = {
+      table = "fake.table1"
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+
+  FakeSource {
+    result_table_name = "fake2"
+    row.num = 30
+    schema = {
+      table = "fake.public.table2"
+      fields {
+        name = "string"
+        age = "int"
+        sex = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name = "fake1"
+  }
+  console {
+    source_table_name = "fake2"
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index d7e55db4ec..100aa0b320 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.client;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.common.config.Common;
@@ -51,10 +53,14 @@ import lombok.extern.slf4j.Slf4j;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterators;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
@@ -548,6 +554,114 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testGetMultiTableJobMetrics() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/batch_fake_multi_table_to_console.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("testGetMultiTableJobMetrics");
+
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
+
+        try {
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, 
jobConfig, SEATUNNEL_CONFIG);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<JobStatus> objectCompletableFuture =
+                    CompletableFuture.supplyAsync(
+                            () -> {
+                                return clientJobProxy.waitForJobComplete();
+                            });
+            long jobId = clientJobProxy.getJobId();
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertTrue(
+                                            
jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+                                                    && jobClient
+                                                            
.listJobStatus(true)
+                                                            
.contains("FINISHED")));
+
+            String jobMetrics = jobClient.getJobMetrics(jobId);
+
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + 
"#fake.table1"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SOURCE_RECEIVED_COUNT + 
"#fake.public.table2"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + 
"#fake.table1"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + 
"#fake.public.table2"));
+
+            log.info("jobMetrics : {}", jobMetrics);
+            JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
+            List<String> metricNameList =
+                    StreamSupport.stream(
+                                    Spliterators.spliteratorUnknownSize(
+                                            jobMetricsStr.fieldNames(), 0),
+                                    false)
+                            .filter(
+                                    metricName ->
+                                            
metricName.startsWith(SOURCE_RECEIVED_COUNT)
+                                                    || 
metricName.startsWith(SINK_WRITE_COUNT))
+                            .collect(Collectors.toList());
+
+            Map<String, Long> totalCount =
+                    metricNameList.stream()
+                            .filter(metrics -> !metrics.contains("#"))
+                            .collect(
+                                    Collectors.toMap(
+                                            metrics -> metrics,
+                                            metrics ->
+                                                    StreamSupport.stream(
+                                                                    
jobMetricsStr
+                                                                            
.get(metrics)
+                                                                            
.spliterator(),
+                                                                    false)
+                                                            .mapToLong(
+                                                                    value ->
+                                                                            
value.get("value")
+                                                                               
     .asLong())
+                                                            .sum()));
+
+            Map<String, Long> tableCount =
+                    metricNameList.stream()
+                            .filter(metrics -> metrics.contains("#"))
+                            .collect(
+                                    Collectors.toMap(
+                                            metrics -> metrics,
+                                            metrics ->
+                                                    StreamSupport.stream(
+                                                                    
jobMetricsStr
+                                                                            
.get(metrics)
+                                                                            
.spliterator(),
+                                                                    false)
+                                                            .mapToLong(
+                                                                    value ->
+                                                                            
value.get("value")
+                                                                               
     .asLong())
+                                                            .sum()));
+
+            Assertions.assertEquals(
+                    totalCount.get(SOURCE_RECEIVED_COUNT),
+                    tableCount.entrySet().stream()
+                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
+            Assertions.assertEquals(
+                    totalCount.get(SINK_WRITE_COUNT),
+                    tableCount.entrySet().stream()
+                            .filter(e -> 
e.getKey().startsWith(SINK_WRITE_COUNT))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
+
+        } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {
+            throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
+        }
+    }
+
     @AfterAll
     public static void after() {
         INSTANCE.shutdown();
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
new file mode 100644
index 0000000000..df7ae51fe6
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake1"
+    row.num = 20
+    schema = {
+      table = "fake.table1"
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+    parallelism = 1
+  }
+
+  FakeSource {
+    result_table_name = "fake2"
+    row.num = 30
+    schema = {
+      table = "fake.public.table2"
+      fields {
+        name = "string"
+        age = "int"
+        sex = "int"
+      }
+    }
+    parallelism = 1
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name = "fake1"
+  }
+  console {
+    source_table_name = "fake2"
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6081b0f2ea..d5d60b7cbb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
@@ -64,8 +65,10 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Spliterators;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
@@ -79,7 +82,9 @@ import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITO
 public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCommand> {
 
     private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
+    private static final String TABLE_SOURCE_RECEIVED_COUNT = 
"TableSourceReceivedCount";
     private static final String SINK_WRITE_COUNT = "SinkWriteCount";
+    private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
     private final Log4j2HttpGetCommandProcessor original;
     private NodeEngine nodeEngine;
 
@@ -362,12 +367,31 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                         .collect(JsonArray::new, JsonArray::add, 
JsonArray::add));
     }
 
-    private Map<String, Long> getJobMetrics(String jobMetrics) {
-        Map<String, Long> metricsMap = new HashMap<>();
+    private Map<String, Object> getJobMetrics(String jobMetrics) {
+        Map<String, Object> metricsMap = new HashMap<>();
         long sourceReadCount = 0L;
         long sinkWriteCount = 0L;
+        Map<String, JsonNode> tableSourceReceivedCountMap = new HashMap<>();
+        Map<String, JsonNode> tableSinkWriteCountMap = new HashMap<>();
         try {
             JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
+            StreamSupport.stream(
+                            
Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0),
+                            false)
+                    .filter(metricName -> metricName.contains("#"))
+                    .forEach(
+                            metricName -> {
+                                String tableName =
+                                        
TablePath.of(metricName.split("#")[1]).getFullName();
+                                if 
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
+                                    tableSourceReceivedCountMap.put(
+                                            tableName, 
jobMetricsStr.get(metricName));
+                                }
+                                if 
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
+                                    tableSinkWriteCountMap.put(
+                                            tableName, 
jobMetricsStr.get(metricName));
+                                }
+                            });
             JsonNode sourceReceivedCountJson = 
jobMetricsStr.get(SOURCE_RECEIVED_COUNT);
             JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT);
             for (int i = 0; i < 
jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) {
@@ -379,9 +403,36 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         } catch (JsonProcessingException | NullPointerException e) {
             return metricsMap;
         }
+
+        Map<String, Long> tableSourceReceivedCount =
+                tableSourceReceivedCountMap.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry ->
+                                                StreamSupport.stream(
+                                                                
entry.getValue().spliterator(),
+                                                                false)
+                                                        .mapToLong(
+                                                                node -> 
node.get("value").asLong())
+                                                        .sum()));
+        Map<String, Long> tableSinkWriteCount =
+                tableSinkWriteCountMap.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry ->
+                                                StreamSupport.stream(
+                                                                
entry.getValue().spliterator(),
+                                                                false)
+                                                        .mapToLong(
+                                                                node -> 
node.get("value").asLong())
+                                                        .sum()));
+
         metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
         metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
-
+        metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount);
+        metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount);
         return metricsMap;
     }
 
@@ -475,11 +526,24 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                 .add(
                         RestConstant.IS_START_WITH_SAVE_POINT,
                         jobImmutableInformation.isStartWithSavePoint())
-                .add(RestConstant.METRICS, 
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+                .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
 
         return jobInfoJson;
     }
 
+    private JsonObject toJsonObject(Map<String, Object> jobMetrics) {
+        JsonObject members = new JsonObject();
+        jobMetrics.forEach(
+                (key, value) -> {
+                    if (value instanceof Map) {
+                        members.add(key, toJsonObject((Map<String, Object>) 
value));
+                    } else {
+                        members.add(key, value.toString());
+                    }
+                });
+        return members;
+    }
+
     private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, 
JobDAGInfo jobDAGInfo) {
         return new JsonObject()
                 .add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
@@ -498,6 +562,6 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
-                .add(RestConstant.METRICS, 
JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+                .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index f5d4aed1ab..62612d0617 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Counter;
 import org.apache.seatunnel.api.common.metrics.Meter;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import 
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
 import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
@@ -34,12 +35,17 @@ import 
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
@@ -54,12 +60,16 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
 
+    private final MetricsContext metricsContext;
+
     private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new 
AtomicBoolean(false);
 
     private final AtomicBoolean schemaChangeAfterCheckpointSignal = new 
AtomicBoolean(false);
 
     private final Counter sourceReceivedCount;
 
+    private final Map<String, Counter> sourceReceivedCountPerTable = new 
ConcurrentHashMap<>();
+
     private final Meter sourceReceivedQPS;
     private final Counter sourceReceivedBytes;
 
@@ -77,17 +87,24 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
             List<OneInputFlowLifeCycle<Record<?>>> outputs,
             MetricsContext metricsContext,
             FlowControlStrategy flowControlStrategy,
-            SeaTunnelDataType rowType) {
+            SeaTunnelDataType rowType,
+            List<TablePath> tablePaths) {
         this.checkpointLock = checkpointLock;
         this.outputs = outputs;
         this.rowType = rowType;
+        this.metricsContext = metricsContext;
         if (rowType instanceof MultipleRowType) {
             ((MultipleRowType) rowType)
                     .iterator()
-                    .forEachRemaining(
-                            type -> {
-                                this.rowTypeMap.put(type.getKey(), 
type.getValue());
-                            });
+                    .forEachRemaining(type -> 
this.rowTypeMap.put(type.getKey(), type.getValue()));
+        }
+        if (CollectionUtils.isNotEmpty(tablePaths)) {
+            tablePaths.forEach(
+                    tablePath ->
+                            sourceReceivedCountPerTable.put(
+                                    getFullName(tablePath),
+                                    metricsContext.counter(
+                                            SOURCE_RECEIVED_COUNT + "#" + 
getFullName(tablePath))));
         }
         sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
         sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
@@ -100,14 +117,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
     public void collect(T row) {
         try {
             if (row instanceof SeaTunnelRow) {
+                String tableId = ((SeaTunnelRow) row).getTableId();
                 int size;
                 if (rowType instanceof SeaTunnelRowType) {
                     size = ((SeaTunnelRow) 
row).getBytesSize((SeaTunnelRowType) rowType);
                 } else if (rowType instanceof MultipleRowType) {
-                    size =
-                            ((SeaTunnelRow) row)
-                                    .getBytesSize(
-                                            rowTypeMap.get(((SeaTunnelRow) 
row).getTableId()));
+                    size = ((SeaTunnelRow) 
row).getBytesSize(rowTypeMap.get(tableId));
                 } else {
                     throw new SeaTunnelEngineException(
                             "Unsupported row type: " + 
rowType.getClass().getName());
@@ -115,6 +130,18 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                 sourceReceivedBytes.inc(size);
                 sourceReceivedBytesPerSeconds.markEvent(size);
                 flowControlGate.audit((SeaTunnelRow) row);
+                if (StringUtils.isNotEmpty(tableId)) {
+                    String tableName = getFullName(TablePath.of(tableId));
+                    Counter sourceTableCounter = 
sourceReceivedCountPerTable.get(tableName);
+                    if (Objects.nonNull(sourceTableCounter)) {
+                        sourceTableCounter.inc();
+                    } else {
+                        Counter counter =
+                                metricsContext.counter(SOURCE_RECEIVED_COUNT + 
"#" + tableName);
+                        counter.inc();
+                        sourceReceivedCountPerTable.put(tableName, counter);
+                    }
+                }
             }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
@@ -205,4 +232,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
             }
         }
     }
+
+    private String getFullName(TablePath tablePath) {
+        if (StringUtils.isBlank(tablePath.getTableName())) {
+            tablePath =
+                    TablePath.of(tablePath.getDatabaseName(), 
tablePath.getSchemaName(), "default");
+        }
+        return tablePath.getFullName();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 53171d4031..dbcde3e9d6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -37,9 +39,11 @@ import com.hazelcast.logging.Logger;
 import lombok.Getter;
 import lombok.NonNull;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends 
SeaTunnelTask {
 
@@ -76,10 +80,16 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
                             + startFlowLifeCycle.getClass().getName());
         } else {
             SeaTunnelDataType sourceProducedType;
+            List<TablePath> tablePaths = new ArrayList<>();
             try {
                 List<CatalogTable> producedCatalogTables =
                         
sourceFlow.getAction().getSource().getProducedCatalogTables();
                 sourceProducedType = 
CatalogTableUtil.convertToDataType(producedCatalogTables);
+                tablePaths =
+                        producedCatalogTables.stream()
+                                .map(CatalogTable::getTableId)
+                                .map(TableIdentifier::toTablePath)
+                                .collect(Collectors.toList());
             } catch (UnsupportedOperationException e) {
                 // TODO remove it when all connector use 
`getProducedCatalogTables`
                 sourceProducedType = 
sourceFlow.getAction().getSource().getProducedType();
@@ -90,7 +100,8 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
                             outputs,
                             this.getMetricsContext(),
                             FlowControlStrategy.fromMap(envOption),
-                            sourceProducedType);
+                            sourceProducedType,
+                            tablePaths);
             ((SourceFlowLifeCycle<T, SplitT>) 
startFlowLifeCycle).setCollector(collector);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 48c530a0c3..516e1c97c4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -26,6 +26,8 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -43,6 +45,8 @@ import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitO
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.hazelcast.cluster.Address;
 import lombok.extern.slf4j.Slf4j;
 
@@ -52,9 +56,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
@@ -92,6 +98,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private Counter sinkWriteCount;
 
+    private Map<String, Counter> sinkWriteCountPerTable = new 
ConcurrentHashMap<>();
+
     private Meter sinkWriteQPS;
 
     private Counter sinkWriteBytes;
@@ -125,6 +133,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
         sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES);
         sinkWriteBytesPerSeconds = 
metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS);
+        if (sinkAction.getSink() instanceof MultiTableSink) {
+            List<TablePath> sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
+            sinkTables.forEach(
+                    tablePath ->
+                            sinkWriteCountPerTable.put(
+                                    getFullName(tablePath),
+                                    metricsContext.counter(
+                                            SINK_WRITE_COUNT + "#" + 
getFullName(tablePath))));
+        }
     }
 
     @Override
@@ -256,6 +273,19 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     long size = ((SeaTunnelRow) 
record.getData()).getBytesSize();
                     sinkWriteBytes.inc(size);
                     sinkWriteBytesPerSeconds.markEvent(size);
+                    String tableId = ((SeaTunnelRow) 
record.getData()).getTableId();
+                    if (StringUtils.isNotBlank(tableId)) {
+                        String tableName = getFullName(TablePath.of(tableId));
+                        Counter sinkTableCounter = 
sinkWriteCountPerTable.get(tableName);
+                        if (Objects.nonNull(sinkTableCounter)) {
+                            sinkTableCounter.inc();
+                        } else {
+                            Counter counter =
+                                    metricsContext.counter(SINK_WRITE_COUNT + 
"#" + tableName);
+                            counter.inc();
+                            sinkWriteCountPerTable.put(tableName, counter);
+                        }
+                    }
                 }
             }
         } catch (Exception e) {
@@ -315,4 +345,12 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
             ((SupportResourceShare) 
this.writer).setMultiTableResourceManager(resourceManager, 0);
         }
     }
+
+    private String getFullName(TablePath tablePath) {
+        if (StringUtils.isBlank(tablePath.getTableName())) {
+            tablePath =
+                    TablePath.of(tablePath.getDatabaseName(), 
tablePath.getSchemaName(), "default");
+        }
+        return tablePath.getFullName();
+    }
 }


Reply via email to