This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abbc5d006 [INLONG-9112][Agent] Add task and instance profile (#9113)
0abbc5d006 is described below

commit 0abbc5d006b3a0f0f368d1b8ac42a763d37b7f99
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Oct 26 11:39:32 2023 +0800

    [INLONG-9112][Agent] Add task and instance profile (#9113)
---
 .../apache/inlong/agent/conf/InstanceProfile.java  | 169 +++++++
 .../org/apache/inlong/agent/conf/TaskProfile.java  | 119 +++++
 .../org/apache/inlong/agent/pojo/FileTask.java     | 153 ++++++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   | 549 +++++++++++++++++++++
 4 files changed, 990 insertions(+)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
new file mode 100644
index 0000000000..024b7674eb
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.conf;
+
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.file.FileUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
+import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
+import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+
+/**
+ * job profile which contains details describing properties of one job.
+ */
+public class InstanceProfile extends AbstractConfiguration implements 
Comparable<InstanceProfile> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceProfile.class);
+    private static final Gson GSON = new Gson();
+
+    /**
+     * parse json string to configuration instance.
+     *
+     * @return job configuration
+     */
+    public static InstanceProfile parseJsonStr(String jsonStr) {
+        InstanceProfile conf = new InstanceProfile();
+        conf.loadJsonStrResource(jsonStr);
+        return conf;
+    }
+
+    public String toJsonStr() {
+        return GSON.toJson(getConfigStorage());
+    }
+
+    public void setInstanceClass(String className) {
+        set(TaskConstants.INSTANCE_CLASS, className);
+    }
+
+    public String getInstanceClass() {
+        return get(TaskConstants.INSTANCE_CLASS);
+    }
+
+    public String getTaskId() {
+        return get(TaskConstants.TASK_ID);
+    }
+
+    public String getInstanceId() {
+        return get(TaskConstants.INSTANCE_ID);
+    }
+
+    public String getSourceClass() {
+        return get(TaskConstants.TASK_SOURCE);
+    }
+
+    public String getSinkClass() {
+        return get(TaskConstants.TASK_SINK);
+    }
+
+    public InstanceStateEnum getState() {
+        int value = getInt(INSTANCE_STATE, 
InstanceStateEnum.DEFAULT.ordinal());
+        return InstanceStateEnum.getTaskState(value);
+    }
+
+    public void setState(InstanceStateEnum state) {
+        setInt(INSTANCE_STATE, state.ordinal());
+    }
+
+    @Override
+    public boolean allRequiredKeyExist() {
+        return true;
+    }
+
+    /**
+     * get MQClusterInfo list from config
+     */
+    public List<MQClusterInfo> getMqClusters() {
+        List<MQClusterInfo> result = null;
+        String mqClusterStr = get(JOB_MQ_ClUSTERS);
+        if (StringUtils.isNotBlank(mqClusterStr)) {
+            result = GSON.fromJson(mqClusterStr, new 
TypeToken<List<MQClusterInfo>>() {
+            }.getType());
+        }
+        return result;
+    }
+
+    /**
+     * get mqTopic from config
+     */
+    public DataProxyTopicInfo getMqTopic() {
+        DataProxyTopicInfo result = null;
+        String topicStr = get(JOB_MQ_TOPIC);
+        if (StringUtils.isNotBlank(topicStr)) {
+            result = GSON.fromJson(topicStr, DataProxyTopicInfo.class);
+        }
+        return result;
+    }
+
+    public void setCreateTime(Long time) {
+        setLong(TaskConstants.INSTANCE_CREATE_TIME, time);
+    }
+
+    public Long getCreateTime() {
+        return getLong(TaskConstants.INSTANCE_CREATE_TIME, 0);
+    }
+
+    public void setModifyTime(Long time) {
+        setLong(TaskConstants.INSTANCE_MODIFY_TIME, time);
+    }
+
+    public Long getModifyTime() {
+        return getLong(TaskConstants.INSTANCE_MODIFY_TIME, 0);
+    }
+
+    public void setInstanceId(String instanceId) {
+        set(TaskConstants.INSTANCE_ID, instanceId);
+    }
+
+    public void setDataTime(String dataTime) {
+        set(TaskConstants.JOB_DATA_TIME, dataTime);
+    }
+
+    public String getDataTime() {
+        return get(TaskConstants.JOB_DATA_TIME);
+    }
+
+    @Override
+    public int compareTo(InstanceProfile object) {
+        int ret = ComparisonChain.start()
+                .compare(getDataTime(), object.getDataTime())
+                .compare(FileUtils.getFileCreationTime(getInstanceId()),
+                        FileUtils.getFileCreationTime(object.getInstanceId()))
+                .compare(FileUtils.getFileLastModifyTime(getInstanceId()),
+                        
FileUtils.getFileLastModifyTime(object.getInstanceId()))
+                .result();
+        return ret;
+    }
+
+    public boolean isRetry() {
+        return getBoolean(TASK_RETRY, false);
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
new file mode 100644
index 0000000000..1040afa88a
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.conf;
+
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.pojo.TaskProfileDto;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+
+import com.google.gson.Gson;
+
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
+
+/**
+ * job profile which contains details describing properties of one job.
+ */
+public class TaskProfile extends AbstractConfiguration {
+
+    private static final Gson GSON = new Gson();
+
+    /**
+     * Get a TaskProfile from a DataConfig
+     */
+    public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
+        if (dataConfig == null) {
+            return null;
+        }
+        return TaskProfileDto.convertToTaskProfile(dataConfig);
+    }
+
+    public String getTaskId() {
+        return get(TaskConstants.TASK_ID);
+    }
+
+    public String getCycleUnit() {
+        return get(TaskConstants.TASK_CYCLE_UNIT);
+    }
+
+    public String getTimeOffset() {
+        return get(TaskConstants.TASK_FILE_TIME_OFFSET);
+    }
+
+    public TaskStateEnum getState() {
+        return TaskStateEnum.getTaskState(getInt(TASK_STATE));
+    }
+
+    public void setState(TaskStateEnum state) {
+        setInt(TASK_STATE, state.ordinal());
+    }
+
+    public boolean isRetry() {
+        return getBoolean(TASK_RETRY, false);
+    }
+
+    public String getTaskClass() {
+        return get(TaskConstants.TASK_CLASS);
+    }
+
+    public void setTaskClass(String className) {
+        set(TaskConstants.TASK_CLASS, className);
+    }
+
+    /**
+     * parse json string to configuration instance.
+     *
+     * @return job configuration
+     */
+    public static TaskProfile parseJsonStr(String jsonStr) {
+        TaskProfile conf = new TaskProfile();
+        conf.loadJsonStrResource(jsonStr);
+        return conf;
+    }
+
+    /**
+     * check whether required keys exists.
+     *
+     * @return return true if all required keys exists else false.
+     */
+    @Override
+    public boolean allRequiredKeyExist() {
+        return hasKey(TaskConstants.TASK_ID) && 
hasKey(TaskConstants.TASK_SOURCE)
+                && hasKey(TaskConstants.TASK_SINK) && 
hasKey(TaskConstants.TASK_CHANNEL)
+                && hasKey(TaskConstants.TASK_GROUP_ID) && 
hasKey(TaskConstants.TASK_STREAM_ID)
+                && hasKey(TaskConstants.TASK_CYCLE_UNIT);
+    }
+
+    public String toJsonStr() {
+        return GSON.toJson(getConfigStorage());
+    }
+
+    public InstanceProfile createInstanceProfile(String instanceClass, String 
fileName, String dataTime) {
+        InstanceProfile instanceProfile = 
InstanceProfile.parseJsonStr(toJsonStr());
+        instanceProfile.setInstanceClass(instanceClass);
+        instanceProfile.setInstanceId(fileName);
+        instanceProfile.setDataTime(dataTime);
+        instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
+        instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
+        instanceProfile.setState(InstanceStateEnum.DEFAULT);
+        return instanceProfile;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
new file mode 100644
index 0000000000..7942e74bef
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class FileTask {
+
+    private Dir dir;
+    private Thread thread;
+    private Integer id;
+    private String cycleUnit;
+    private Boolean retry;
+    private Long startTime;
+    private Long endTime;
+    private String timeOffset;
+    private String addictiveString;
+    private String collectType;
+    private Line line;
+    private Integer maxFileCount;
+
+    // INCREMENT
+    // FULL
+    private String contentCollectType;
+
+    private String envList;
+
+    // JSON string, the content format is List<Map<String, String>>
+    private String metaFields;
+
+    private String dataSeparator;
+
+    // JSON string, the content format is Map<String,string>
+    private String filterMetaByLabels;
+
+    // JSON string, the content format is Map<String,Object>
+    private String properties;
+
+    // Monitor interval for file
+    private Long monitorInterval;
+
+    // Monitor switch, 1 true and 0 false
+    private Integer monitorStatus;
+
+    // Monitor expire time and the time in milliseconds
+    private Long monitorExpire;
+
+    @Data
+    public static class Dir {
+
+        private String patterns;
+
+        private String blackList;
+    }
+
+    @Data
+    public static class Running {
+
+        private String core;
+    }
+
+    @Data
+    public static class Thread {
+
+        private Running running;
+    }
+
+    @Data
+    public static class Line {
+
+        private String endPattern;
+    }
+
+    @Data
+    public static class FileTaskConfig {
+
+        private String cycleUnit;
+
+        private Boolean retry;
+
+        private Long startTime;
+
+        private Long endTime;
+
+        private String pattern;
+
+        private String blackList;
+        // '1m' means one minute after, '-1m' means one minute before
+        // '1h' means one hour after, '-1h' means one hour before
+        // '1d' means one day after, '-1d' means one day before
+        // Null means from current timestamp
+        private String timeOffset;
+        // For example: a=b&c=b&e=f
+        private String additionalAttr;
+
+        private String collectType;
+
+        private String lineEndPattern;
+
+        // Type of file content, for example: FULL, INCREMENT
+        private String contentCollectType;
+
+        // File needs to collect environment information, for example: 
kubernetes
+        private String envList;
+        // Metadata of data, for example:
+        // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and 
so on
+        private List<Map<String, String>> metaFields;
+        // Type of data result for column separator
+        // CSV format, set this parameter to a custom separator: , | :
+        // Json format, set this parameter to json
+        private String dataContentStyle;
+
+        // Column separator of data source
+        private String dataSeparator;
+
+        // Metadata filters by label, special parameters for K8S
+        private Map<String, String> filterMetaByLabels;
+
+        // Properties for file
+        private Map<String, Object> properties;
+
+        // Monitor interval for file
+        private Long monitorInterval;
+
+        // Monitor switch, 1 true and 0 false
+        private Integer monitorStatus;
+
+        // Monitor expire time and the time in milliseconds
+        private Long monitorExpire;
+
+        private Integer maxFileCount;
+    }
+
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
new file mode 100644
index 0000000000..964ac1cf02
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.agent.pojo.FileTask.Line;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+
+import com.google.gson.Gson;
+import lombok.Data;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN;
+import static 
org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY;
+
+@Data
+public class TaskProfileDto {
+
+    public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask";
+    public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
+    public static final String MANAGER_JOB = "MANAGER_JOB";
+    public static final String DEFAULT_DATAPROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
+    public static final String FILE_DATAPROXY_SINK =
+            "org.apache.inlong.agent.plugin.sinks.filecollect.ProxySink";
+    public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
+    public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
+
+    /**
+     * file source
+     */
+    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.LogFileSource";
+    /**
+     * binlog source
+     */
+    public static final String BINLOG_SOURCE = 
"org.apache.inlong.agent.plugin.sources.BinlogSource";
+    /**
+     * kafka source
+     */
+    public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+    /**
+     * PostgreSQL source
+     */
+    public static final String POSTGRESQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
+    /**
+     * mongo source
+     */
+    public static final String MONGO_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
+    /**
+     * oracle source
+     */
+    public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
+    /**
+     * redis source
+     */
+    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
+    /**
+     * mqtt source
+     */
+    public static final String MQTT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MqttSource";
+    /**
+     * sqlserver source
+     */
+    public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
+
+    private static final Gson GSON = new Gson();
+
+    private Task task;
+    private Proxy proxy;
+
+    private static BinlogJob getBinlogJob(DataConfig dataConfigs) {
+        BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
+                BinlogJob.BinlogJobTaskConfig.class);
+
+        BinlogJob binlogJob = new BinlogJob();
+        binlogJob.setHostname(binlogJobTaskConfig.getHostname());
+        binlogJob.setPassword(binlogJobTaskConfig.getPassword());
+        binlogJob.setUser(binlogJobTaskConfig.getUser());
+        binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList());
+        
binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList());
+        binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema());
+        binlogJob.setPort(binlogJobTaskConfig.getPort());
+        binlogJob.setOffsets(dataConfigs.getSnapshot());
+        binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl());
+        binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone());
+
+        BinlogJob.Offset offset = new BinlogJob.Offset();
+        offset.setIntervalMs(binlogJobTaskConfig.getIntervalMs());
+        offset.setFilename(binlogJobTaskConfig.getOffsetFilename());
+        
offset.setSpecificOffsetFile(binlogJobTaskConfig.getSpecificOffsetFile());
+        
offset.setSpecificOffsetPos(binlogJobTaskConfig.getSpecificOffsetPos());
+
+        binlogJob.setOffset(offset);
+
+        BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot();
+        snapshot.setMode(binlogJobTaskConfig.getSnapshotMode());
+
+        binlogJob.setSnapshot(snapshot);
+
+        BinlogJob.History history = new BinlogJob.History();
+        history.setFilename(binlogJobTaskConfig.getHistoryFilename());
+
+        binlogJob.setHistory(history);
+
+        return binlogJob;
+    }
+
+    private static FileTask getFileJob(DataConfig dataConfig) {
+        FileTask fileTask = new FileTask();
+        fileTask.setId(dataConfig.getTaskId());
+
+        FileTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(),
+                FileTaskConfig.class);
+
+        FileTask.Dir dir = new FileTask.Dir();
+        dir.setPatterns(taskConfig.getPattern());
+        dir.setBlackList(taskConfig.getBlackList());
+        fileTask.setDir(dir);
+        fileTask.setCollectType(taskConfig.getCollectType());
+        fileTask.setContentCollectType(taskConfig.getContentCollectType());
+        fileTask.setDataSeparator(taskConfig.getDataSeparator());
+        fileTask.setMaxFileCount(taskConfig.getMaxFileCount());
+        fileTask.setRetry(taskConfig.getRetry());
+        fileTask.setCycleUnit(taskConfig.getCycleUnit());
+        fileTask.setStartTime(taskConfig.getStartTime());
+        fileTask.setEndTime(taskConfig.getEndTime());
+        fileTask.setProperties(GSON.toJson(taskConfig.getProperties()));
+        if (taskConfig.getTimeOffset() != null) {
+            fileTask.setTimeOffset(taskConfig.getTimeOffset());
+        }
+
+        if (taskConfig.getAdditionalAttr() != null) {
+            fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
+        }
+
+        if (null != taskConfig.getLineEndPattern()) {
+            FileTask.Line line = new Line();
+            line.setEndPattern(taskConfig.getLineEndPattern());
+            fileTask.setLine(line);
+        }
+
+        if (null != taskConfig.getEnvList()) {
+            fileTask.setEnvList(taskConfig.getEnvList());
+        }
+
+        if (null != taskConfig.getMetaFields()) {
+            fileTask.setMetaFields(GSON.toJson(taskConfig.getMetaFields()));
+        }
+
+        if (null != taskConfig.getFilterMetaByLabels()) {
+            
fileTask.setFilterMetaByLabels(GSON.toJson(taskConfig.getFilterMetaByLabels()));
+        }
+
+        if (null != taskConfig.getMonitorInterval()) {
+            fileTask.setMonitorInterval(taskConfig.getMonitorInterval());
+        }
+
+        if (null != taskConfig.getMonitorStatus()) {
+            fileTask.setMonitorStatus(taskConfig.getMonitorStatus());
+        }
+        return fileTask;
+    }
+
+    private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
+
+        KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
+                KafkaJob.KafkaJobTaskConfig.class);
+        KafkaJob kafkaJob = new KafkaJob();
+
+        KafkaJob.Bootstrap bootstrap = new KafkaJob.Bootstrap();
+        bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
+        kafkaJob.setBootstrap(bootstrap);
+        KafkaJob.Partition partition = new KafkaJob.Partition();
+        partition.setOffset(dataConfigs.getSnapshot());
+        kafkaJob.setPartition(partition);
+        KafkaJob.Group group = new KafkaJob.Group();
+        group.setId(kafkaJobTaskConfig.getGroupId());
+        kafkaJob.setGroup(group);
+        KafkaJob.RecordSpeed recordSpeed = new KafkaJob.RecordSpeed();
+        recordSpeed.setLimit(kafkaJobTaskConfig.getRecordSpeedLimit());
+        kafkaJob.setRecordSpeed(recordSpeed);
+        KafkaJob.ByteSpeed byteSpeed = new KafkaJob.ByteSpeed();
+        byteSpeed.setLimit(kafkaJobTaskConfig.getByteSpeedLimit());
+        kafkaJob.setByteSpeed(byteSpeed);
+        kafkaJob.setAutoOffsetReset(kafkaJobTaskConfig.getAutoOffsetReset());
+
+        kafkaJob.setTopic(kafkaJobTaskConfig.getTopic());
+
+        return kafkaJob;
+    }
+
+    private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
+        PostgreSQLJob.PostgreSQLJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                PostgreSQLJob.PostgreSQLJobConfig.class);
+        PostgreSQLJob postgreSQLJob = new PostgreSQLJob();
+
+        postgreSQLJob.setUser(config.getUsername());
+        postgreSQLJob.setPassword(config.getPassword());
+        postgreSQLJob.setHostname(config.getHostname());
+        postgreSQLJob.setPort(config.getPort());
+        postgreSQLJob.setDbname(config.getDatabase());
+        postgreSQLJob.setServername(config.getSchema());
+        postgreSQLJob.setPluginname(config.getDecodingPluginName());
+        postgreSQLJob.setTableNameList(config.getTableNameList());
+        postgreSQLJob.setServerTimeZone(config.getServerTimeZone());
+        postgreSQLJob.setScanStartupMode(config.getScanStartupMode());
+        postgreSQLJob.setPrimaryKey(config.getPrimaryKey());
+
+        return postgreSQLJob;
+    }
+
+    private static RedisJob getRedisJob(DataConfig dataConfig) {
+        RedisJob.RedisJobConfig config = 
GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class);
+        RedisJob redisJob = new RedisJob();
+
+        redisJob.setAuthUser(config.getUsername());
+        redisJob.setAuthPassword(config.getPassword());
+        redisJob.setHostname(config.getHostname());
+        redisJob.setPort(config.getPort());
+        redisJob.setSsl(config.getSsl());
+        redisJob.setReadTimeout(config.getTimeout());
+        redisJob.setQueueSize(config.getQueueSize());
+        redisJob.setReplId(config.getReplId());
+
+        return redisJob;
+    }
+
+    private static MongoJob getMongoJob(DataConfig dataConfigs) {
+
+        MongoJob.MongoJobTaskConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                MongoJob.MongoJobTaskConfig.class);
+        MongoJob mongoJob = new MongoJob();
+
+        mongoJob.setHosts(config.getHosts());
+        mongoJob.setUser(config.getUsername());
+        mongoJob.setPassword(config.getPassword());
+        mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList());
+        mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList());
+        mongoJob.setCollectionIncludeList(config.getCollectionIncludeList());
+        mongoJob.setCollectionExcludeList(config.getCollectionExcludeList());
+        mongoJob.setFieldExcludeList(config.getFieldExcludeList());
+        mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
+        mongoJob.setQueueSize(config.getQueueSize());
+        mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
+        mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
+        mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
+        mongoJob.setFieldRenames(config.getFieldRenames());
+        mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover());
+        mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts());
+        
mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
+        
mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
+        mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
+        
mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
+        mongoJob.setSslEnabled(config.getSslEnabled());
+        mongoJob.setPollIntervalInMs(config.getPollIntervalInMs());
+
+        MongoJob.Offset offset = new MongoJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        mongoJob.setOffset(offset);
+
+        MongoJob.Snapshot snapshot = new MongoJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        mongoJob.setSnapshot(snapshot);
+
+        MongoJob.History history = new MongoJob.History();
+        history.setFilename(config.getHistoryFilename());
+        mongoJob.setHistory(history);
+
+        return mongoJob;
+    }
+
+    private static OracleJob getOracleJob(DataConfig dataConfigs) {
+        OracleJob.OracleJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                OracleJob.OracleJobConfig.class);
+        OracleJob oracleJob = new OracleJob();
+        oracleJob.setUser(config.getUser());
+        oracleJob.setHostname(config.getHostname());
+        oracleJob.setPassword(config.getPassword());
+        oracleJob.setPort(config.getPort());
+        oracleJob.setServerName(config.getServerName());
+        oracleJob.setDbname(config.getDbname());
+
+        OracleJob.Offset offset = new OracleJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        oracleJob.setOffset(offset);
+
+        OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        oracleJob.setSnapshot(snapshot);
+
+        OracleJob.History history = new OracleJob.History();
+        history.setFilename(config.getHistoryFilename());
+        oracleJob.setHistory(history);
+
+        return oracleJob;
+    }
+
+    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
+        SqlServerJob.SqlserverJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                SqlServerJob.SqlserverJobConfig.class);
+        SqlServerJob sqlServerJob = new SqlServerJob();
+        sqlServerJob.setUser(config.getUsername());
+        sqlServerJob.setHostname(config.getHostname());
+        sqlServerJob.setPassword(config.getPassword());
+        sqlServerJob.setPort(config.getPort());
+        sqlServerJob.setServerName(config.getSchemaName());
+        sqlServerJob.setDbname(config.getDatabase());
+
+        SqlServerJob.Offset offset = new SqlServerJob.Offset();
+        offset.setFilename(config.getOffsetFilename());
+        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
+        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
+        sqlServerJob.setOffset(offset);
+
+        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
+        snapshot.setMode(config.getSnapshotMode());
+        sqlServerJob.setSnapshot(snapshot);
+
+        SqlServerJob.History history = new SqlServerJob.History();
+        history.setFilename(config.getHistoryFilename());
+        sqlServerJob.setHistory(history);
+
+        return sqlServerJob;
+    }
+
+    public static MqttJob getMqttJob(DataConfig dataConfigs) {
+        MqttJob.MqttJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
+                MqttJob.MqttJobConfig.class);
+        MqttJob mqttJob = new MqttJob();
+
+        mqttJob.setServerURI(config.getServerURI());
+        mqttJob.setUserName(config.getUsername());
+        mqttJob.setPassword(config.getPassword());
+        mqttJob.setTopic(config.getTopic());
+        mqttJob.setConnectionTimeOut(config.getConnectionTimeOut());
+        mqttJob.setKeepAliveInterval(config.getKeepAliveInterval());
+        mqttJob.setQos(config.getQos());
+        mqttJob.setCleanSession(config.getCleanSession());
+        mqttJob.setClientIdPrefix(config.getClientId());
+        mqttJob.setQueueSize(config.getQueueSize());
+        mqttJob.setAutomaticReconnect(config.getAutomaticReconnect());
+        mqttJob.setMqttVersion(config.getMqttVersion());
+
+        return mqttJob;
+    }
+
+    private static Proxy getProxy(DataConfig dataConfigs) {
+        Proxy proxy = new Proxy();
+        Manager manager = new Manager();
+        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+        manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
+        manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
+        proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
+        proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
+        proxy.setManager(manager);
+        if (null != dataConfigs.getSyncSend()) {
+            proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
+        }
+        if (null != dataConfigs.getSyncPartitionKey()) {
+            proxy.setPartitionKey(dataConfigs.getSyncPartitionKey());
+        }
+        return proxy;
+    }
+
+    /**
+     * convert DataConfig to TaskProfile
+     */
+    public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
+        if (!dataConfig.isValid()) {
+            throw new IllegalArgumentException("input dataConfig" + dataConfig 
+ "is invalid please check");
+        }
+
+        TaskProfileDto profileDto = new TaskProfileDto();
+        Proxy proxy = getProxy(dataConfig);
+        profileDto.setProxy(proxy);
+        Task task = new Task();
+
+        // common attribute
+        task.setId(String.valueOf(dataConfig.getTaskId()));
+        task.setGroupId(dataConfig.getInlongGroupId());
+        task.setStreamId(dataConfig.getInlongStreamId());
+        task.setChannel(DEFAULT_CHANNEL);
+        task.setIp(dataConfig.getIp());
+        task.setOp(dataConfig.getOp());
+        task.setDeliveryTime(dataConfig.getDeliveryTime());
+        task.setUuid(dataConfig.getUuid());
+        task.setVersion(dataConfig.getVersion());
+        task.setState(dataConfig.getState());
+
+        // set sink type
+        if (dataConfig.getDataReportType() == 
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
+            task.setSink(FILE_DATAPROXY_SINK);
+            task.setProxySend(false);
+        } else if (dataConfig.getDataReportType() == 1) {
+            task.setSink(FILE_DATAPROXY_SINK);
+            task.setProxySend(true);
+        } else {
+            String mqType = dataConfig.getMqClusters().get(0).getMqType();
+            task.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
+            task.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
+            if (mqType.equals(MQType.PULSAR)) {
+                task.setSink(PULSAR_SINK);
+            } else if (mqType.equals(MQType.KAFKA)) {
+                task.setSink(KAFKA_SINK);
+            } else {
+                throw new IllegalArgumentException("input dataConfig" + 
dataConfig + "is invalid please check");
+            }
+        }
+        TaskTypeEnum taskType = 
TaskTypeEnum.getTaskType(dataConfig.getTaskType());
+        switch (requireNonNull(taskType)) {
+            case SQL:
+            case BINLOG:
+                BinlogJob binlogJob = getBinlogJob(dataConfig);
+                task.setBinlogJob(binlogJob);
+                task.setSource(BINLOG_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case FILE:
+                task.setTaskClass(DEFAULT_FILE_TASK);
+                FileTask fileTask = getFileJob(dataConfig);
+                task.setFileTask(fileTask);
+                task.setSource(DEFAULT_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case KAFKA:
+                KafkaJob kafkaJob = getKafkaJob(dataConfig);
+                task.setKafkaJob(kafkaJob);
+                task.setSource(KAFKA_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case POSTGRES:
+                PostgreSQLJob postgreSQLJob = getPostgresJob(dataConfig);
+                task.setPostgreSQLJob(postgreSQLJob);
+                task.setSource(POSTGRESQL_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case ORACLE:
+                OracleJob oracleJob = getOracleJob(dataConfig);
+                task.setOracleJob(oracleJob);
+                task.setSource(ORACLE_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case SQLSERVER:
+                SqlServerJob sqlserverJob = getSqlServerJob(dataConfig);
+                task.setSqlserverJob(sqlserverJob);
+                task.setSource(SQLSERVER_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case MONGODB:
+                MongoJob mongoJob = getMongoJob(dataConfig);
+                task.setMongoJob(mongoJob);
+                task.setSource(MONGO_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case REDIS:
+                RedisJob redisJob = getRedisJob(dataConfig);
+                task.setRedisJob(redisJob);
+                task.setSource(REDIS_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case MQTT:
+                MqttJob mqttJob = getMqttJob(dataConfig);
+                task.setMqttJob(mqttJob);
+                task.setSource(MQTT_SOURCE);
+                profileDto.setTask(task);
+                break;
+            case MOCK:
+                profileDto.setTask(task);
+                break;
+            default:
+        }
+        return TaskProfile.parseJsonStr(GSON.toJson(profileDto));
+    }
+
+    @Data
+    public static class Task {
+
+        private String id;
+        private String groupId;
+        private String streamId;
+        private String ip;
+        private String source;
+        private String sink;
+        private String channel;
+        private String name;
+        private String op;
+        private String retryTime;
+        private String deliveryTime;
+        private String uuid;
+        private Integer version;
+        private boolean proxySend;
+        private String mqClusters;
+        private String topicInfo;
+        private String taskClass;
+        private Integer state;
+
+        private FileTask fileTask;
+        private BinlogJob binlogJob;
+        private KafkaJob kafkaJob;
+        private PostgreSQLJob postgreSQLJob;
+        private OracleJob oracleJob;
+        private MongoJob mongoJob;
+        private RedisJob redisJob;
+        private MqttJob mqttJob;
+        private SqlServerJob sqlserverJob;
+    }
+
+    @Data
+    public static class Manager {
+
+        private String port;
+        private String host;
+    }
+
+    @Data
+    public static class Proxy {
+
+        private String inlongGroupId;
+        private String inlongStreamId;
+        private Manager manager;
+        private Boolean sync;
+        private String partitionKey;
+    }
+
+}
\ No newline at end of file


Reply via email to