This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository

The following commit(s) were added to refs/heads/master by this push:
     new b76ef7482f [INLONG-9136][Agent] Add instance manager (#9142)
b76ef7482f is described below

commit b76ef7482ff3a59cb73cb686b971f99a9b0d6f27
Author: justinwwhuang <>
AuthorDate: Fri Oct 27 21:32:11 2023 +0800

    [INLONG-9136][Agent] Add instance manager (#9142)
 .../org/apache/inlong/agent/plugin/   |  56 ++++
 .../agent/core/instance/       | 350 +++++++++++++++++++++
 .../inlong/agent/core/instance/   |  78 +++++
 .../agent/core/instance/   | 102 ++++++
 4 files changed, 586 insertions(+)

diff --git 
new file mode 100755
index 0000000000..90bac4c94f
--- /dev/null
@@ -0,0 +1,56 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.agent.plugin;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.state.AbstractStateWrapper;
+ * Instance interface, which generated by task in condition.
+ */
+public abstract class Instance extends AbstractStateWrapper {
+    /**
+     * init instance by instance profile
+     *
+     * @throws IOException
+     */
+    public abstract void init(Object instanceManager, InstanceProfile profile);
+    /**
+     * destroy instance.
+     */
+    public abstract void destroy();
+    /**
+     * get instance profile
+     */
+    public abstract InstanceProfile getProfile();
+    /**
+     * get task id
+     */
+    public abstract String getTaskId();
+    /**
+     * get instance id
+     */
+    public abstract String getInstanceId();
diff --git 
new file mode 100644
index 0000000000..a6c8381de3
--- /dev/null
@@ -0,0 +1,350 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.agent.core.instance;
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.InstanceDb;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+ * handle the instance created by task, including add, delete, update etc.
+ * the instance info is store in both db and memory.
+ */
+public class InstanceManager extends AbstractDaemon {
+    private static final Logger LOGGER = 
+    private static final int ACTION_QUEUE_CAPACITY = 100000;
+    public static final int CORE_THREAD_SLEEP_TIME = 100;
+    // task in db
+    private final InstanceDb instanceDb;
+    // task in memory
+    private final ConcurrentHashMap<String, Instance> instanceMap;
+    // instance profile queue.
+    private final BlockingQueue<InstanceAction> actionQueue;
+    // task thread pool;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("instance-manager"));
+    private final int taskMaxLimit;
+    private final AgentConfiguration agentConf;
+    private final String taskId;
+    private volatile boolean runAtLeastOneTime = false;
+    private volatile boolean running = false;
+    /**
+     * Init task manager.
+     */
+    public InstanceManager(String taskId, Db basicDb) {
+        this.taskId = taskId;
+        instanceDb = new InstanceDb(basicDb);
+        this.agentConf = AgentConfiguration.getAgentConf();
+        instanceMap = new ConcurrentHashMap<>();
+        taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT, 
+        actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+    }
+    public String getTaskId() {
+        return taskId;
+    }
+    public Instance getInstance(String instanceId) {
+        return instanceMap.get(instanceId);
+    }
+    public InstanceProfile getInstanceProfile(String instanceId) {
+        return instanceDb.getInstance(taskId, instanceId);
+    }
+    public boolean submitAction(InstanceAction action) {
+        if (action == null) {
+            return false;
+        }
+        return actionQueue.offer(action);
+    }
+    /**
+     * thread for core thread.
+     *
+     * @return runnable profile.
+     */
+    private Runnable coreThread() {
+        return () -> {
+            Thread.currentThread().setName("instance-manager-core-" + taskId);
+            running = true;
+            while (isRunnable()) {
+                try {
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    dealWithActionQueue(actionQueue);
+                    keepPaceWithDb();
+                } catch (Throwable ex) {
+                    LOGGER.error("coreThread {}", ex.getMessage());
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
+                }
+                runAtLeastOneTime = true;
+            }
+            running = false;
+        };
+    }
+    private void keepPaceWithDb() {
+        traverseDbTasksToMemory();
+        traverseMemoryTasksToDb();
+    }
+    private void traverseDbTasksToMemory() {
+        instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
+            InstanceStateEnum dbState = profileFromDb.getState();
+            Instance task = instanceMap.get(profileFromDb.getInstanceId());
+            switch (dbState) {
+                case DEFAULT: {
+                    if (task == null) {
+              "traverseDbTasksToMemory add instance to 
mem taskId {} instanceId {}",
+                                profileFromDb.getTaskId(), 
+                        addToMemory(profileFromDb);
+                    }
+                    break;
+                }
+                case FINISHED:
+                    DELETE: {
+                        if (task != null) {
+                  "traverseDbTasksToMemory delete 
instance from mem taskId {} instanceId {}",
+                                    profileFromDb.getTaskId(), 
+                            deleteFromMemory(profileFromDb.getInstanceId());
+                        }
+                        break;
+                    }
+                default: {
+                    LOGGER.error("instance invalid state {} taskId {} 
instanceId {}", dbState,
+                            profileFromDb.getTaskId(),
+                            profileFromDb.getInstanceId());
+                }
+            }
+        });
+    }
+    private void traverseMemoryTasksToDb() {
+        instanceMap.values().forEach((instance) -> {
+            InstanceProfile profileFromDb = 
instanceDb.getInstance(instance.getTaskId(), instance.getInstanceId());
+            if (profileFromDb == null) {
+                deleteFromMemory(instance.getInstanceId());
+                return;
+            }
+            InstanceStateEnum stateFromDb = profileFromDb.getState();
+            if (stateFromDb != InstanceStateEnum.DEFAULT) {
+                deleteFromMemory(instance.getInstanceId());
+            }
+        });
+    }
+    private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) {
+        while (isRunnable()) {
+            try {
+                InstanceAction action = queue.poll();
+                if (action == null) {
+                    break;
+                }
+                switch (action.getActionType()) {
+                    case ADD:
+                        addInstance(action.getProfile());
+                        break;
+                    case FINISH:
+                        finishInstance(action.getProfile());
+                        break;
+                    case DELETE:
+                        deleteInstance(action.getProfile().getInstanceId());
+                        break;
+                    default:
+                        LOGGER.error("invalid action type for instance 
manager: taskId {} type {}", taskId,
+                                action.getActionType());
+                }
+            } catch (Throwable ex) {
+                LOGGER.error("dealWithActionQueue", ex);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+            }
+        }
+    }
+    @Override
+    public void start() {
+        restoreFromDb();
+        submitWorker(coreThread());
+    }
+    @Override
+    public void stop() {
+        waitForTerminate();
+        stopAllInstances();
+    }
+    public void waitForTerminate() {
+        super.waitForTerminate();
+        while (running) {
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+    private void restoreFromDb() {
+        List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
+        profileList.forEach((profile) -> {
+            InstanceStateEnum state = profile.getState();
+            if (state == InstanceStateEnum.DEFAULT) {
+      "instance restoreFromDb addToMem state {} taskId 
{} instanceId {}", state, taskId,
+                        profile.getInstanceId());
+                addToMemory(profile);
+            } else {
+      "instance restoreFromDb ignore state {} taskId {} 
instanceId {}", state, taskId,
+                        profile.getInstanceId());
+            }
+        });
+    }
+    private void addInstance(InstanceProfile profile) {
+"addInstance taskId {} instanceId {}", taskId, 
+        addToDb(profile);
+        addToMemory(profile);
+    }
+    private void finishInstance(InstanceProfile profile) {
+        profile.setState(InstanceStateEnum.FINISHED);
+        profile.setModifyTime(AgentUtils.getCurrentTime());
+        addToDb(profile);
+        deleteFromMemory(profile.getInstanceId());
+"finished instance state {} taskId {} instanceId {}", 
+                profile.getTaskId(), profile.getInstanceId());
+    }
+    private void deleteInstance(String instanceId) {
+        deleteFromDb(instanceId);
+        deleteFromMemory(instanceId);
+    }
+    private void deleteFromDb(String instanceId) {
+        instanceDb.deleteInstance(taskId, instanceId);
+"delete instance from db: taskId {} instanceId {} result 
{}", taskId,
+                instanceId, instanceDb.getInstance(taskId, instanceId));
+    }
+    private void deleteFromMemory(String instanceId) {
+        Instance instance = instanceMap.get(instanceId);
+        if (instance == null) {
+            LOGGER.error("try to delete instance from memory but not found: 
taskId {} instanceId {}", taskId,
+                    instanceId);
+            return;
+        }
+        instance.destroy();
+        instanceMap.remove(instanceId);
+"delete instance from memory: taskId {} instanceId {}", 
taskId, instance.getInstanceId());
+    }
+    private void addToDb(InstanceProfile profile) {
+"add instance to db instanceId {} ", 
+        instanceDb.storeInstance(profile);
+    }
+    /**
+     * add instance to memory, if there is a record refer to the instance id 
exist we need to destroy it first.
+     */
+    private void addToMemory(InstanceProfile instanceProfile) {
+        Instance oldInstance = 
+        if (oldInstance != null) {
+            oldInstance.destroy();
+            instanceMap.remove(instanceProfile.getInstanceId());
+            LOGGER.error("old instance {} should not exist, try stop it first",
+                    instanceProfile);
+        }
+"instanceProfile {}", instanceProfile.toJsonStr());
+        try {
+            Class<?> taskClass = 
+            Instance instance = (Instance) taskClass.newInstance();
+            instance.init(this, instanceProfile);
+            instanceMap.put(instanceProfile.getInstanceId(), instance);
+            EXECUTOR_SERVICE.submit(instance);
+                    "add instance to memory instanceId {} instanceMap size {}, 
runningPool instance total {}, runningPool instance active {}",
+                    instance.getInstanceId(), instanceMap.size(), 
+                    EXECUTOR_SERVICE.getActiveCount());
+        } catch (Throwable t) {
+            LOGGER.error("add instance error {}", t.getMessage());
+        }
+    }
+    private void stopAllInstances() {
+        instanceMap.values().forEach((instance) -> {
+            deleteInstance(instance.getInstanceId());
+        });
+        instanceMap.clear();
+    }
+    public boolean shouldAddAgain(String fileName, long lastModifyTime) {
+        InstanceProfile profileFromDb = instanceDb.getInstance(taskId, 
+        if (profileFromDb == null) {
+            return true;
+        } else {
+            InstanceStateEnum state = profileFromDb.getState();
+            if (state == InstanceStateEnum.FINISHED && lastModifyTime > 
profileFromDb.getModifyTime()) {
+                return true;
+            }
+            if (state == InstanceStateEnum.DELETE) {
+                return true;
+            }
+            return false;
+        }
+    }
+    public boolean allInstanceFinished() {
+        if (!runAtLeastOneTime) {
+            return false;
+        }
+        if (!instanceMap.isEmpty()) {
+            return false;
+        }
+        if (!actionQueue.isEmpty()) {
+            return false;
+        }
+        List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+        for (int i = 0; i < instances.size(); i++) {
+            InstanceProfile profile = instances.get(i);
+            if (profile.getState() != InstanceStateEnum.FINISHED) {
+                return false;
+            }
+        }
+        return true;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000000..ada8cefcdd
--- /dev/null
@@ -0,0 +1,78 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.agent.core.instance;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Instance;
+public class MockInstance extends Instance {
+    public static final int INIT_TIME = 100;
+    public static final int RUN_TIME = 101;
+    public static final int DESTROY_TIME = 102;
+    private InstanceProfile profile;
+    private long index = INIT_TIME;
+    public long initTime = 0;
+    public long destroyTime = 0;
+    public long runtime = 0;
+    private InstanceManager instanceManager;
+    @Override
+    public void init(Object instanceManager, InstanceProfile profile) {
+        this.instanceManager = (InstanceManager) instanceManager;
+        this.profile = profile;
+        initTime = index++;
+    }
+    @Override
+    public void destroy() {
+        destroyTime = index++;
+    }
+    @Override
+    public InstanceProfile getProfile() {
+        return profile;
+    }
+    @Override
+    public String getTaskId() {
+        return profile.getTaskId();
+    }
+    @Override
+    public String getInstanceId() {
+        return profile.getInstanceId();
+    }
+    @Override
+    public void addCallbacks() {
+    }
+    @Override
+    public void run() {
+        runtime = index++;
+    }
+    public void sendFinishAction() {
+        InstanceAction action = new InstanceAction();
+        action.setActionType(ActionType.FINISH);
+        action.setProfile(profile);
+        instanceManager.submitAction(action);
+    }
\ No newline at end of file
diff --git 
new file mode 100755
index 0000000000..62bfa85d8a
--- /dev/null
@@ -0,0 +1,102 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.inlong.agent.core.instance;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.core.AgentBaseTestsHelper;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
+public class TestInstanceManager {
+    private static final Logger LOGGER = 
+    private static InstanceManager manager;
+    private static AgentBaseTestsHelper helper;
+    private static TaskProfile taskProfile;
+    @BeforeClass
+    public static void setup() {
+        helper = new 
+        String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+        Db basicDb = TaskManager.initDb("/localdb");
+        taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
+        manager = new InstanceManager("1", basicDb);
+        manager.start();
+    }
+    @AfterClass
+    public static void teardown() {
+        manager.stop();
+        helper.teardownAgentHome();
+    }
+    @Test
+    public void testInstanceManager() {
+        long timeBefore = AgentUtils.getCurrentTime();
+        InstanceProfile profile = 
+                helper.getTestRootDir() + "/20230927.log_1", "20230927");
+        String instanceId = profile.getInstanceId();
+        InstanceAction action = new InstanceAction();
+        action.setActionType(ActionType.ADD);
+        action.setProfile(profile);
+        // test add action
+        manager.submitAction(action);
+        await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstance(instanceId) != null);
+        Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == 
+        // test finish action
+        MockInstance instance = (MockInstance) 
+        instance.sendFinishAction();
+        await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstance(instanceId) == null);
+        Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == 
+        // test modify before finish
+        Assert.assertFalse(manager.shouldAddAgain(profile.getInstanceId(), 
+        // test modify after finish
+        Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(), 
+        // test continue
+        action.setActionType(ActionType.ADD);
+        profile.setState(InstanceStateEnum.DEFAULT);
+        manager.submitAction(action);
+        await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstance(instanceId) != null);
+        Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == 
+        // test delete action
+        action.setActionType(ActionType.DELETE);
+        manager.submitAction(action);
+        await().atMost(1, TimeUnit.SECONDS).until(() -> 
manager.getInstanceProfile(instanceId) == null);
+        Assert.assertTrue(instance.initTime == MockInstance.INIT_TIME);
+        Assert.assertTrue(instance.runtime == MockInstance.RUN_TIME);
+        Assert.assertTrue(instance.destroyTime == MockInstance.DESTROY_TIME);
+    }

Reply via email to