This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 08fca3a456 [INLONG-9125][Agent] Add offset manager (#9131) 08fca3a456 is described below commit 08fca3a4568f9a09a0d0cb8462be2876ce7f2770 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Oct 26 23:26:40 2023 +0800 [INLONG-9125][Agent] Add offset manager (#9131) --- .../inlong/agent/core/task/OffsetManager.java | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java new file mode 100644 index 0000000000..0c4ca513ce --- /dev/null +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task; + +import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.db.OffsetDb; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * used to store instance offset to db + * where key is task id + read file name and value is instance offset + */ +public class OffsetManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); + private static volatile OffsetManager offsetManager = null; + private final OffsetDb offsetDb; + + private OffsetManager() { + this.offsetDb = new OffsetDb(); + } + + /** + * task position manager singleton, can only generated by agent manager + */ + public static OffsetManager init() { + if (offsetManager == null) { + synchronized (OffsetManager.class) { + if (offsetManager == null) { + offsetManager = new OffsetManager(); + } + } + } + return offsetManager; + } + + /** + * get taskPositionManager singleton + */ + public static OffsetManager getInstance() { + if (offsetManager == null) { + throw new RuntimeException("task position manager has not been initialized by agentManager"); + } + return offsetManager; + } + + public void setOffset(OffsetProfile profile) { + offsetDb.setOffset(profile); + } + + public void deleteOffset(String taskId, String instanceId) { + offsetDb.deleteOffset(taskId, instanceId); + } + + public OffsetProfile getOffset(String taskId, String instanceId) { + return offsetDb.getOffset(taskId, instanceId); + } +}