aloyszhang commented on code in PR #10837:
URL: https://github.com/apache/inlong/pull/10837#discussion_r1724814355


##########
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.http;
+
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class HttpSinkContext extends SinkContext {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(HttpSinkContext.class);
+    public static final String KEY_NODE_ID = "nodeId";
+    public static final String KEY_BASE_URL = "baseUrl";
+    public static final String KEY_ENABLE_CREDENTIAL = "enableCredential";
+    public static final String KEY_USERNAME = "username";
+    public static final String KEY_PASSWORD = "password";
+    public static final String KEY_MAX_CONNECT_TOTAL = "maxConnect";
+    public static final String KEY_MAX_CONNECT_PER_ROUTE = 
"maxConnectPerRoute";
+    public static final String KEY_CONNECTION_REQUEST_TIMEOUT = 
"connectionRequestTimeout";
+    public static final String KEY_SOCKET_TIMEOUT = "socketTimeout";
+    public static final String KEY_MAX_REDIRECTS = "maxRedirects";
+    public static final String KEY_LOG_MAX_LENGTH = "logMaxLength";
+    public static final String KEY_EVENT_HTTPREQUEST_HANDLER = 
"httpRequestHandler";
+
+    public static final boolean DEFAULT_ENABLE_CREDENTIAL = false;
+    public static final int DEFAULT_MAX_CONNECT_TOTAL = 1000;
+    public static final int DEFAULT_MAX_CONNECT_PER_ROUTE = 1000;
+    public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 0;
+    public static final int DEFAULT_SOCKET_TIMEOUT = 0;
+    public static final int DEFAULT_MAX_REDIRECTS = 0;
+    public static final int DEFAULT_LOG_MAX_LENGTH = 32 * 1024;
+
+    private Context sinkContext;
+    private HttpNodeConfig httpNodeConfig;
+    private String nodeId;
+    private Map<String, HttpIdConfig> idConfigMap = new ConcurrentHashMap<>();
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private final BufferQueue<HttpRequest> dispatchQueue;
+    private AtomicLong offerCounter = new AtomicLong(0);
+    private AtomicLong takeCounter = new AtomicLong(0);
+    private AtomicLong backCounter = new AtomicLong(0);
+    // rest client
+    private String baseUrl;
+    private boolean enableCredential;
+    private String username;
+    private String password;
+    private int maxConnect = DEFAULT_MAX_CONNECT_TOTAL;
+    private int maxConnectPerRoute = DEFAULT_MAX_CONNECT_PER_ROUTE;
+    private int connectionRequestTimeout = DEFAULT_CONNECTION_REQUEST_TIMEOUT;
+    private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;
+    private int maxRedirects = DEFAULT_MAX_REDIRECTS;
+    private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
+
+    public HttpSinkContext(String sinkName, Context context, Channel channel,
+            BufferQueue<HttpRequest> dispatchQueue) {
+        super(sinkName, context, channel);
+        this.sinkContext = context;
+        this.dispatchQueue = dispatchQueue;
+        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID);
+    }
+
+    public void reload() {
+        try {
+            LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
+                    taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
+                    takeCounter.getAndSet(0), backCounter.getAndSet(0));
+            TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
+                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
+                return;
+            }
+            LOG.info("get new SortTaskConfig:taskName:{}", taskName);
+
+            if (newTaskConfig != null) {
+                HttpNodeConfig requestNodeConfig = (HttpNodeConfig) 
newTaskConfig.getNodeConfig();
+                if (httpNodeConfig == null || requestNodeConfig.getVersion() > 
httpNodeConfig.getVersion()) {
+                    this.httpNodeConfig = requestNodeConfig;
+                }
+            }
+
+            this.taskConfig = newTaskConfig;
+            this.sortTaskConfig = newSortTaskConfig;
+
+            // change current config
+            Map<String, HttpIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
+            Map<String, HttpIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+            if (unifiedConfiguration) {
+                idConfigMap = fromTaskConfig;
+                reloadClientsFromNodeConfig(httpNodeConfig);
+            } else {
+                idConfigMap = fromSortTaskConfig;
+                reloadClientsFromSortTaskConfig(sortTaskConfig);
+            }
+            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
+            // log
+            LOG.info("end to get 
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,

Review Comment:
   ```suggestion
               LOG.info("End to get 
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to