This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new aea39cc19 [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413) aea39cc19 is described below commit aea39cc19283ccdfa696ce8470b8e8ac60bed42f Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Nov 7 16:02:32 2022 +0800 [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413) --- .../inlong/common/enums/DataProxyErrCode.java | 34 +++--- .../inlong/dataproxy/config/ConfigManager.java | 11 ++ .../config/remote/ConfigMessageServlet.java | 7 +- .../inlong/dataproxy/http/MessageFilter.java | 117 ++++++++++++--------- .../dataproxy/http/MessageProcessServlet.java | 3 +- .../apache/inlong/dataproxy/http/StatusCode.java | 41 -------- .../apache/inlong/dataproxy/sink/PulsarSink.java | 11 +- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 54 +++++++--- .../dataproxy/sink/common/TubeProducerHolder.java | 2 +- .../dataproxy/sink/pulsar/PulsarClientService.java | 6 +- .../dataproxy/source/ServerMessageHandler.java | 18 +++- 11 files changed, 175 insertions(+), 129 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java index 8943ebfd3..732f376e0 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java @@ -25,19 +25,27 @@ public enum DataProxyErrCode { SUCCESS(0, "Ok"), - UNSUPPORTED_MSG_TYPE(1, "Unsupported msgType"), - EMPTY_MSG(2, "Empty message"), - UNSUPPORTED_EXTEND_FIELD_VALUE(3, "Unsupported extend field value"), - UNCONFIGURED_GROUPID_OR_STREAMID(4, "Unconfigured groupId or streamId"), - PUT_EVENT_TO_CHANNEL_FAILURE(5, "Put event to Channels failure"), - - TOPIC_IS_BLANK(6, "Topic is null"), - NO_AVAILABLE_PRODUCER(7, "No available producer info"), - PRODUCER_IS_NULL(8, "Producer is null"), - SEND_REQUEST_TO_MQ_FAILURE(9, "Send request to MQ failure"), - MQ_RETURN_ERROR(10, "MQ client return error"), - - DUPLICATED_MESSAGE(11, "Duplicated message"), + SINK_SERVICE_UNREADY(1, "Service not ready"), + + MISS_REQUIRED_GROUPID_ARGUMENT(100, "Parameter groupId is required"), + MISS_REQUIRED_STREAMID_ARGUMENT(101, "Parameter streamId is required"), + MISS_REQUIRED_DT_ARGUMENT(102, "Parameter dt is required"), + MISS_REQUIRED_BODY_ARGUMENT(103, "Parameter body is required"), + BODY_EXCEED_MAX_LEN(104, "Body length exceed the maximum length"), + + UNSUPPORTED_MSG_TYPE(110, "Unsupported msgType"), + EMPTY_MSG(111, "Empty message"), + UNSUPPORTED_EXTEND_FIELD_VALUE(112, "Unsupported extend field value"), + UNCONFIGURED_GROUPID_OR_STREAMID(113, "Unconfigured groupId or streamId"), + PUT_EVENT_TO_CHANNEL_FAILURE(114, "Put event to Channels failure"), + + TOPIC_IS_BLANK(115, "Topic is null"), + NO_AVAILABLE_PRODUCER(116, "No available producer info"), + PRODUCER_IS_NULL(117, "Producer is null"), + SEND_REQUEST_TO_MQ_FAILURE(118, "Send request to MQ failure"), + MQ_RETURN_ERROR(119, "MQ client return error"), + + DUPLICATED_MESSAGE(120, "Duplicated message"), UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error"); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index 95b95792c..fcff9c3cd 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL; @@ -77,6 +78,8 @@ public class ConfigManager { private final FileConfigHolder blackListConfig = new FileConfigHolder("blacklist.properties"); // source report configure holder private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder(); + // mq clusters ready + private final AtomicBoolean mqClusterReady = new AtomicBoolean(false); /** * get instance for config manager @@ -148,6 +151,14 @@ public class ConfigManager { return sourceReportConfigHolder.getSourceReportInfo(); } + public boolean isMqClusterReady() { + return mqClusterReady.get(); + } + + public void updMqClusterStatus(boolean isStarted) { + mqClusterReady.set(isStarted); + } + /** * update old maps, reload local files if changed. * diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java index 88cc89023..f6760081e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java @@ -29,8 +29,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.io.IOUtils; +import org.apache.inlong.common.enums.DataProxyErrCode; import org.apache.inlong.dataproxy.config.ConfigManager; -import org.apache.inlong.dataproxy.http.StatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +91,8 @@ public class ConfigMessageServlet extends HttpServlet { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - ResponseResult result = new ResponseResult(StatusCode.SERVICE_ERR, ""); + ResponseResult result = + new ResponseResult(DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), ""); BufferedReader reader = null; try { reader = req.getReader(); @@ -110,7 +111,7 @@ public class ConfigMessageServlet extends HttpServlet { } if (isSuccess) { - result.setCode(StatusCode.SUCCESS); + result.setCode(DataProxyErrCode.SUCCESS.getErrCode()); } else { result.setMessage("cannot operate config update, please check it"); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java index 4c6b1ce51..a4c23ca24 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java @@ -19,7 +19,9 @@ package org.apache.inlong.dataproxy.http; import org.apache.commons.lang3.StringUtils; import org.apache.flume.ChannelException; +import org.apache.inlong.common.enums.DataProxyErrCode; import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttrConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,81 +51,98 @@ public class MessageFilter implements Filter { } @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException { + public void doFilter(ServletRequest request, + ServletResponse response, + FilterChain chain) throws IOException { HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse resp = (HttpServletResponse) response; - int code = StatusCode.SUCCESS; - String message = "success"; - String pathInfo = req.getPathInfo(); if (pathInfo.startsWith("/")) { pathInfo = pathInfo.substring(1); } if ("heartbeat".equals(pathInfo)) { - resp.setCharacterEncoding(req.getCharacterEncoding()); - resp.setStatus(HttpServletResponse.SC_OK); - resp.flushBuffer(); + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.SUCCESS.getErrCode(), + DataProxyErrCode.SUCCESS.getErrMsg()); return; } - - String invalidKey = null; + // check sink service status + if (!ConfigManager.getInstance().isMqClusterReady()) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(), + DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg()); + return; + } + // get and check groupId String groupId = req.getParameter(AttributeConstants.GROUP_ID); + if (StringUtils.isEmpty(groupId)) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg()); + return; + } + // get and check streamId String streamId = req.getParameter(AttributeConstants.STREAM_ID); + if (StringUtils.isEmpty(streamId)) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg()); + return; + } + // get and check dt String dt = req.getParameter(AttributeConstants.DATA_TIME); + if (StringUtils.isEmpty(dt)) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg()); + return; + } + // get and check body String body = req.getParameter(AttrConstants.BODY); - - if (StringUtils.isEmpty(groupId)) { - invalidKey = "groupId"; - } else if (StringUtils.isEmpty(streamId)) { - invalidKey = "streamId"; - } else if (StringUtils.isEmpty(dt)) { - invalidKey = "dt"; - } else if (StringUtils.isEmpty(body)) { - invalidKey = "body"; + if (StringUtils.isEmpty(body)) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(), + DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg()); + return; + } + // check body length + if (body.length() > maxMsgLength) { + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(), + "Bad request, body length exceeds the limit:" + maxMsgLength); + return; } - try { - if (invalidKey != null) { - LOG.warn("Received bad request from client. " + invalidKey + " is empty."); - code = StatusCode.ILLEGAL_ARGUMENT; - message = "Bad request from client. " + invalidKey + " must not be empty."; - } else if (body.length() > maxMsgLength) { - LOG.warn("Received bad request from client. Body length is " + body.length()); - code = StatusCode.EXCEED_LEN; - message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength; - } else { - chain.doFilter(request, response); - } + chain.doFilter(request, response); + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.SUCCESS.getErrCode(), + DataProxyErrCode.SUCCESS.getErrMsg()); } catch (Throwable t) { - code = StatusCode.SERVICE_ERR; + String errMsg; if ((t instanceof ChannelException)) { - message = "Channel error!"; + errMsg = "Channel error! " + t.getMessage(); } else { - message = "Service error!"; - LOG.error("Request error!", t); + errMsg = "Service error! " + t.getMessage(); } + LOG.error("Request error!", t); + returnRspPackage(resp, req.getCharacterEncoding(), + DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), errMsg); } - - resp.setCharacterEncoding(req.getCharacterEncoding()); - resp.setStatus(HttpServletResponse.SC_OK); - resp.getWriter().write(getResultContent(code, message)); - resp.flushBuffer(); } @Override public void destroy() { } - private String getResultContent(int code, String message) { - StringBuilder builder = new StringBuilder(); - builder.append("{\"code\":\""); - builder.append(code); - builder.append("\",\"msg\":\""); - builder.append(message); - builder.append("\"}"); - - return builder.toString(); + private void returnRspPackage(HttpServletResponse resp, String charEncoding, + int errCode, String errMsg) throws IOException { + StringBuilder builder = + new StringBuilder().append("{\"code\":\"").append(errCode) + .append("\",\"msg\":\"").append(errMsg).append("\"}"); + resp.setCharacterEncoding(charEncoding); + resp.setStatus(HttpServletResponse.SC_OK); + resp.getWriter().write(builder.toString()); + resp.flushBuffer(); } - } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java index 10ebcd966..61a5b2ab6 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java @@ -17,6 +17,7 @@ package org.apache.inlong.dataproxy.http; +import org.apache.inlong.common.enums.DataProxyErrCode; import org.apache.inlong.common.monitor.LogCounter; import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.dataproxy.consts.AttrConstants; @@ -64,7 +65,7 @@ public class MessageProcessServlet extends HttpServlet { if (logCounter.shouldPrint()) { LOG.error("Received bad request from client. ", e); } - req.setAttribute("code", StatusCode.SERVICE_ERR); + req.setAttribute("code", DataProxyErrCode.UNKNOWN_ERROR.getErrCode()); } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java deleted file mode 100644 index 88e411573..000000000 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.dataproxy.http; - -public interface StatusCode { - - /* - * success - */ - int SUCCESS = 1; - - /* - * illegal argument - */ - int ILLEGAL_ARGUMENT = -100; - - /* - * exceed length - */ - int EXCEED_LEN = -101; - - /* - * service error - */ - int SERVICE_ERR = -105; -} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 52d690d76..6fd938030 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -305,13 +305,18 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag new HashSet<>(topicProperties.values())); pulsarCluster = configManager.getMqClusterUrl2Token(); + if (!ConfigManager.getInstance().isMqClusterReady()) { + this.canTake = true; + ConfigManager.getInstance().updMqClusterStatus(true); + logger.info("[{}] MQ Cluster service status ready!", getName()); + } } @Override public void start() { logger.info("[{}] pulsar sink starting...", getName()); sinkCounter.start(); - pulsarClientService.initCreateConnection(this); + pulsarClientService.initCreateConnection(this, getName()); int statIntervalSec = pulsarConfig.getStatIntervalSec(); Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0"); @@ -345,7 +350,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName()); MetricRegister.register(metricItemSet); - this.canTake = true; + if (ConfigManager.getInstance().isMqClusterReady()) { + this.canTake = true; + } logger.info("[{}] Pulsar sink started", getName()); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 461c0ab49..dee798214 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -128,14 +128,16 @@ public class TubeSink extends AbstractSink implements Configurable { tubeConfig = configManager.getMqClusterConfig(); topicProperties = configManager.getTopicProperties(); masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet(); - // start message deduplication handler - MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(), - tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize()); // only use first cluster address now usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists); // create producer holder - producerHolder = new TubeProducerHolder(getName(), - usedMasterAddr, configManager.getMqClusterConfig()); + if (usedMasterAddr != null) { + producerHolder = new TubeProducerHolder(getName(), + usedMasterAddr, configManager.getMqClusterConfig()); + } + // start message deduplication handler + MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(), + tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize()); // get statistic configure items maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000); statIntervalSec = tubeConfig.getStatIntervalSec(); @@ -196,17 +198,23 @@ public class TubeSink extends AbstractSink implements Configurable { this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName()); MetricRegister.register(metricItemSet); // create tube connection - try { - producerHolder.start(new HashSet<>(topicProperties.values())); - } catch (FlumeException e) { - logger.error("Unable to start TubeMQ client. Exception follows.", e); - super.stop(); - return; + if (producerHolder != null) { + try { + producerHolder.start(new HashSet<>(topicProperties.values())); + ConfigManager.getInstance().updMqClusterStatus(true); + logger.info("[{}] MQ Cluster service status ready!", getName()); + } catch (FlumeException e) { + logger.error("Unable to start TubeMQ client. Exception follows.", e); + super.stop(); + return; + } } // start the cleaner thread super.start(); this.canSend = true; - this.canTake = true; + if (ConfigManager.getInstance().isMqClusterReady()) { + this.canTake = true; + } for (int i = 0; i < sinkThreadPool.length; i++) { sinkThreadPool[i] = new Thread(new TubeSinkTask(), getName() + "_tube_sink_sender-" + i); @@ -610,10 +618,12 @@ public class TubeSink extends AbstractSink implements Configurable { } // publish them if (!addedTopics.isEmpty()) { - try { - producerHolder.createProducersByTopicSet(addedTopics); - } catch (Exception e) { - logger.info(getName() + "'s publish new topic set fail.", e); + if (producerHolder != null) { + try { + producerHolder.createProducersByTopicSet(addedTopics); + } catch (Exception e) { + logger.info(getName() + "'s publish new topic set fail.", e); + } } logger.info(getName() + "'s topics set has changed, trigger diff publish for {}", addedTopics); @@ -658,7 +668,17 @@ public class TubeSink extends AbstractSink implements Configurable { producerHolder = newProducerHolder; usedMasterAddr = newMasterAddr; // close old producer holder - tmpProducerHolder.stop(); + if (tmpProducerHolder == null) { + diffSetPublish(new HashSet<>(), + new HashSet<>(configManager.getTopicProperties().values())); + } else { + tmpProducerHolder.stop(); + } + if (!ConfigManager.getInstance().isMqClusterReady()) { + this.canTake = true; + ConfigManager.getInstance().updMqClusterStatus(true); + logger.info("[{}] MQ Cluster service status ready!", getName()); + } logger.info(getName() + " switch cluster from " + tmpMasterAddr + " to " + usedMasterAddr); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java index c1852fed3..04b43c87f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java @@ -211,7 +211,7 @@ public class TubeProducerHolder { * * @param cfgTopicSet the configured topic set */ - public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception { + public synchronized void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception { if (cfgTopicSet == null || cfgTopicSet.isEmpty()) { return; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index cca008c19..13d58c4fc 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -116,7 +116,7 @@ public class PulsarClientService { topicSendIndexMap = new ConcurrentHashMap<>(); } - public void initCreateConnection(CreatePulsarClientCallBack callBack) { + public void initCreateConnection(CreatePulsarClientCallBack callBack, String sinkName) { pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token(); if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) { logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully."); @@ -124,6 +124,10 @@ public class PulsarClientService { } try { createConnection(callBack); + if (!ConfigManager.getInstance().isMqClusterReady()) { + ConfigManager.getInstance().updMqClusterStatus(true); + logger.info("[{}] MQ Cluster service status ready!", sinkName); + } } catch (FlumeException e) { logger.error("unable to create pulsar client: ", e); close(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 01767ffb9..7c3636fe1 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -284,6 +284,14 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { if (commonAttrMap == null) { commonAttrMap = new HashMap<>(); } + // check whether extract data failure + String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE); + if (!StringUtils.isEmpty(errCode) + && !DataProxyErrCode.SUCCESS.getErrCodeStr().equals(errCode)) { + MessageUtils.sourceReturnRspPackage( + commonAttrMap, resultMap, remoteChannel, msgType); + return; + } // process heartbeat message if (MsgType.MSG_HEARTBEAT.equals(msgType) || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) { @@ -310,7 +318,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { commonAttrMap, resultMap, remoteChannel, msgType); return; } - // transfer message data + // check sink service status + if (!ConfigManager.getInstance().isMqClusterReady()) { + commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE, + DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCodeStr()); + MessageUtils.sourceReturnRspPackage( + commonAttrMap, resultMap, remoteChannel, msgType); + return; + } + // convert message data Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size()); if (!convertMsgList(msgList, commonAttrMap, messageMap, strRemoteIP)) {