This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 48f6f152a [INLONG-6009][Agent] Report data size to InLong Audit (#6011) 48f6f152a is described below commit 48f6f152a9b59cb931c39e6c2d649409d93491eb Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Mon Sep 26 15:05:03 2022 +0800 [INLONG-6009][Agent] Report data size to InLong Audit (#6011) --- .../inlong/agent/metrics/audit/AuditUtils.java | 15 +--- .../inlong/agent/plugin/sinks/SenderManager.java | 8 +- .../agent/plugin/sources/reader/BinlogReader.java | 3 +- .../agent/plugin/sources/reader/KafkaReader.java | 2 +- .../agent/plugin/sources/reader/MongoDBReader.java | 89 +++++++++++----------- .../plugin/sources/reader/PostgreSQLReader.java | 3 +- .../agent/plugin/sources/reader/RedisReader.java | 5 +- .../plugin/sources/reader/SQLServerReader.java | 12 +-- .../agent/plugin/sources/reader/SqlReader.java | 3 +- .../sources/reader/file/FileReaderOperator.java | 4 +- 10 files changed, 69 insertions(+), 75 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index 465ced374..d3ad42538 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -72,21 +72,12 @@ public class AuditUtils { /** * add audit metric */ - public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count) { + public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count, + long size) { if (!IS_AUDIT) { return; } - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, 0); - } - - /** - * add - */ - public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime) { - if (!IS_AUDIT) { - return; - } - AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, 0); + AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 3b8d1ee95..dd164847d 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -264,7 +264,9 @@ public class SenderManager { if (result == SendResult.OK) { semaphore.release(bodyList.size()); getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size()); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size()); + long totalSize = bodyList.stream().mapToLong(body -> body.length).sum(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(), + totalSize); } else { getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size()); LOGGER.warn("send data to dataproxy error {}", result.toString()); @@ -319,7 +321,9 @@ public class SenderManager { Map<String, String> dims = new HashMap<>(); dims.put(KEY_INLONG_GROUP_ID, groupId); dims.put(KEY_INLONG_STREAM_ID, streamId); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size()); + long totalSize = bodyList.stream().mapToLong(body -> body.length).sum(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(), + totalSize); getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size()); if (sourcePath != null) { taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java index 21188de90..3fab3899c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java @@ -167,8 +167,9 @@ public class BinlogReader extends AbstractReader { committer.markProcessed(record); } committer.markBatchFinished(); + long dataSize = records.stream().mapToLong(r -> r.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), records.size()); + System.currentTimeMillis(), records.size(), dataSize); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (Exception e) { readerMetric.pluginReadFailCount.addAndGet(records.size()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java index 5c5f0e240..79e5479ee 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java @@ -112,7 +112,7 @@ public class KafkaReader<K, V> extends AbstractReader { byte[] recordValue = (byte[]) record.value(); if (validateMessage(recordValue)) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, - inlongGroupId, inlongStreamId, System.currentTimeMillis()); + inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, recordValue.length); // header Map<String, String> headerMap = new HashMap<>(); headerMap.put("record.offset", String.valueOf(record.offset())); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java index b085e0b0f..c7f0341f5 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java @@ -45,68 +45,68 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_TIMEOUT_MS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_EXCLUDE_LIST; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_RENAMES; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_COPY_THREADS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS; import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SNAPSHOT_MODE; -import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CAPTURE_MODE; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_TIMEOUT_MS; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CURSOR_MAX_AWAIT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_RENAMES; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_INITIAL_SYNC_MAX_THREADS; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED; -import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD; import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_POLL_INTERVAL; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER; /** * MongoDBReader : mongo source, split mongo source job into multi readers @@ -254,10 +254,10 @@ public class MongoDBReader extends AbstractReader { * Handle the completion of the embedded connector engine. * * @param success {@code true} if the connector completed normally, - * or {@code false} if the connector produced an error - * that prevented startup or premature termination. + * or {@code false} if the connector produced an error + * that prevented startup or premature termination. * @param message the completion message; never null - * @param error the error, or null if there was no exception + * @param error the error, or null if there was no exception */ private void handle(boolean success, String message, Throwable error) { //jobConf.getInstanceId() @@ -318,7 +318,7 @@ public class MongoDBReader extends AbstractReader { } private void setEngineConfigIfNecessary(JobProfile jobConf, - Configuration.Builder builder, String key, Field field) { + Configuration.Builder builder, String key, Field field) { String value = jobConf.get(key, field.defaultValueAsString()); if (StringUtils.isBlank(value)) { return; @@ -352,11 +352,11 @@ public class MongoDBReader extends AbstractReader { * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)} * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished. * - * @param records the records to be processed + * @param records the records to be processed * @param committer the committer that indicates to the system that we are finished */ private void handleChangeEvent(List<ChangeEvent<String, String>> records, - DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) { + DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) { try { for (ChangeEvent<String, String> record : records) { DebeziumFormat debeziumFormat = JSONPath.read(record.value(), "$.payload", DebeziumFormat.class); @@ -364,8 +364,9 @@ public class MongoDBReader extends AbstractReader { committer.markProcessed(record); } committer.markBatchFinished(); + long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId, - System.currentTimeMillis(), records.size()); + System.currentTimeMillis(), records.size(), dataSize); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java index 5d78c2787..dc7587bc7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java @@ -162,8 +162,9 @@ public class PostgreSQLReader extends AbstractReader { committer.markProcessed(record); } committer.markBatchFinished(); + long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), records.size()); + System.currentTimeMillis(), records.size(), dataSize); readerMetric.pluginReadCount.addAndGet(records.size()); } catch (Exception e) { readerMetric.pluginReadFailCount.addAndGet(records.size()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java index a59f7eaf5..b90dc36b9 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java @@ -108,9 +108,10 @@ public class RedisReader extends AbstractReader { public void onEvent(Replicator replicator, Event event) { try { if (event instanceof DefaultCommand || event instanceof KeyValuePair<?, ?>) { - redisMessageQueue.put(gson.toJson(event)); + String eventJson = gson.toJson(event); + redisMessageQueue.put(eventJson); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), 1); + System.currentTimeMillis(), 1, eventJson.length()); readerMetric.pluginReadCount.incrementAndGet(); } if (event instanceof PostRdbSyncEvent) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java index 66d023e0f..9edb14ba3 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java @@ -46,29 +46,22 @@ import static java.sql.Types.VARBINARY; */ public class SQLServerReader extends AbstractReader { - private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); - public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric"; - public static final String JOB_DATABASE_USER = "job.sqlserverJob.user"; public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password"; public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname"; public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port"; public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname"; - public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize"; public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000; - public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass"; public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; - public static final String STD_FIELD_SEPARATOR_SHORT = "\001"; public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator"; - // pre-set sql lines, commands like "set xxx=xx;" public static final String JOB_DATABASE_TYPE = "job.database.type"; public static final String SQLSERVER = "sqlserver"; - + private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR), String.valueOf(CharUtils.LF)}; private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY}; @@ -116,8 +109,9 @@ public class SQLServerReader extends AbstractReader { } lineColumns.add(dataValue); } + long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis()); + System.currentTimeMillis(), 1, dataSize); readerMetric.pluginReadCount.incrementAndGet(); return generateMessage(lineColumns); } catch (Exception ex) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java index 5bf9ac134..b1b0c2280 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java @@ -115,8 +115,9 @@ public class SqlReader extends AbstractReader { } lineColumns.add(dataValue); } + long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, - inlongGroupId, inlongStreamId, System.currentTimeMillis()); + inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, dataSize); readerMetric.pluginReadCount.incrementAndGet(); return generateMessage(lineColumns); } else { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 3419ee160..83d6e2a97 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -91,8 +91,8 @@ public class FileReaderOperator extends AbstractReader { if (iterator != null && iterator.hasNext()) { String message = iterator.next(); if (validateMessage(message)) { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, - inlongGroupId, inlongStreamId, System.currentTimeMillis()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), 1, message.length()); readerMetric.pluginReadCount.incrementAndGet(); String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map<String, String> header = new HashMap<>();