This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ab0d7e9ef [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806) ab0d7e9ef is described below commit ab0d7e9ef6308197a89e49ec2b3f49a941c71104 Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Wed Sep 7 13:59:53 2022 +0800 [INLONG-5805][Sort] Fix init interceptor incorrectly (#5806) --- .../standalone/utils/FlumeConfigGenerator.java | 25 ++++++++++++++++------ .../rollback/TimeBasedFilterInterceptor.java | 17 +++++++-------- .../standalone/source/sortsdk/SortSdkSource.java | 2 +- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java index 2981d1982..ca3e1b102 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java @@ -33,8 +33,9 @@ public class FlumeConfigGenerator { public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type"; public static final String KEY_SORT_SINK_TYPE = "sortSink.type"; public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type"; - public static final String KEY_SDK_START_TIME = "sortSdk.startTime"; - public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime"; + public static final String KEY_SORT_INTERCEPTOR_TYPE = "interceptor.type"; + public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime"; + public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime"; public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) { Map<String, String> flumeConf = new HashMap<>(); @@ -161,12 +162,24 @@ public class FlumeConfigGenerator { flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector"); // valid msg time interval builder.setLength(0); - String startTimeKey = builder.append(prefix).append(KEY_SDK_START_TIME).toString(); - Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME)) + String interceptorKey = builder.append(prefix).append("interceptors").toString(); + String interceptorName = name + "Interceptor"; + flumeConf.put(interceptorKey, interceptorName); + + builder.setLength(0); + String interceptorType = builder.append(prefix).append("interceptors.").append(interceptorName) + .append(".type").toString(); + Optional.ofNullable(CommonPropertiesHolder.getString(KEY_SORT_INTERCEPTOR_TYPE)) + .map(type -> flumeConf.put(interceptorType, type)); + builder.setLength(0); + String startTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".") + .append(KEY_ROLLBACK_START_TIME).toString(); + Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_START_TIME)) .map(startTime -> flumeConf.put(startTimeKey, startTime)); builder.setLength(0); - String stopTimeKey = builder.append(prefix).append(KEY_SDK_STOP_TIME).toString(); - Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME)) + String stopTimeKey = builder.append(prefix).append("interceptors.").append(interceptorName).append(".") + .append(KEY_ROLLBACK_STOP_TIME).toString(); + Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_STOP_TIME)) .map(stopTime -> flumeConf.put(stopTimeKey, stopTime)); appendCommon(flumeConf, prefix, null, name); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java index 5894a8b87..79739794a 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/rollback/TimeBasedFilterInterceptor.java @@ -17,12 +17,10 @@ package org.apache.inlong.sort.standalone.rollback; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.inlong.sort.standalone.channel.ProfileEvent; -import org.apache.inlong.sort.standalone.utils.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,15 +57,16 @@ public class TimeBasedFilterInterceptor implements Interceptor { @Override public Event intercept(Event event) { long logTime; + ProfileEvent profile; if (event instanceof ProfileEvent) { - ProfileEvent profile = (ProfileEvent) event; + profile = (ProfileEvent) event; logTime = profile.getRawLogTime(); } else { - logTime = NumberUtils.toLong(event.getHeaders().get(Constants.HEADER_KEY_MSG_TIME), - System.currentTimeMillis()); + return event; } if (logTime > stopTime || logTime < startTime) { + profile.ack(); return null; } return event; @@ -93,9 +92,9 @@ public class TimeBasedFilterInterceptor implements Interceptor { public static class Builder implements Interceptor.Builder { private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - private static final String START_TIME = "start-time"; + private static final String START_TIME = "rollback.startTime"; private static final long DEFAULT_START_TIME = 0L; - private static final String STOP_TIME = "stop-time"; + private static final String STOP_TIME = "rollback.stopTime"; private static final long DEFAULT_STOP_TIME = Long.MAX_VALUE; private long startTime; @@ -108,7 +107,7 @@ public class TimeBasedFilterInterceptor implements Interceptor { @Override public void configure(Context context) { - startTime = Optional.ofNullable(context.getString(START_TIME)) + startTime = Optional.ofNullable(context.getString(START_TIME)) .map(s -> { logger.info("config TimeBasedFilterInterceptor, start time is {}", s); try { @@ -120,7 +119,7 @@ public class TimeBasedFilterInterceptor implements Interceptor { }) .orElse(DEFAULT_START_TIME); - stopTime = Optional.ofNullable(context.getString(STOP_TIME)) + stopTime = Optional.ofNullable(context.getString(STOP_TIME)) .map(s -> { logger.info("config TimeBasedFilterInterceptor, stop time is {}", s); try { diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java index 18cef6065..aee6a11f7 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java @@ -108,8 +108,8 @@ public final class SortSdkSource extends AbstractSource */ @Override public synchronized void start() { - LOG.info("start to SortSdkSource:{}", taskName); int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM); + LOG.info("start to SortSdkSource:{}, client num is {}", taskName, sortSdkClientNum); for (int i = 0; i < sortSdkClientNum; i++) { this.sortClients.add(this.newClient(taskName)); }