This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ab59b91be2 [INLONG-11481][Sort] Tube Connector source supports dirty data achieving (#11482) ab59b91be2 is described below commit ab59b91be2a2d84b0c0155d84a1da6bad204cc4d Author: vernedeng <verned...@apache.org> AuthorDate: Mon Nov 11 18:40:56 2024 +0800 [INLONG-11481][Sort] Tube Connector source supports dirty data achieving (#11482) --- .../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 27 +++-- ...SdkDirtySink.java => InlongSdkDirtySender.java} | 19 +++- inlong-sort/sort-flink/base/pom.xml | 5 + .../org/apache/inlong/sort/base/Constants.java | 2 +- .../apache/inlong/sort/base/dirty/DirtyData.java | 40 ++++++- ...gSdkOptions.java => InlongSdkDirtyOptions.java} | 7 +- .../base/dirty/sink/sdk/InlongSdkDirtySink.java | 115 ++++++++------------- .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java | 39 +++---- .../sort-connectors/tubemq/pom.xml | 9 ++ .../table/DynamicTubeMQDeserializationSchema.java | 2 +- .../DynamicTubeMQTableDeserializationSchema.java | 61 ++++++++++- .../tubemq/table/TubeMQDynamicTableFactory.java | 37 ++++--- .../sort/tubemq/table/TubeMQTableSource.java | 15 ++- 13 files changed, 243 insertions(+), 135 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java index 984c456480..977e002478 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -18,12 +18,17 @@ package org.apache.inlong.sdk.dirtydata; import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.text.StringEscapeUtils; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.StringJoiner; +@Slf4j @Builder public class DirtyMessageWrapper { @@ -32,16 +37,17 @@ public class DirtyMessageWrapper { private String inlongGroupId; private String inlongStreamId; - private String dataTime; + private long dataTime; private String dataflowId; private String serverType; private String dirtyType; + private String dirtyMessage; private String ext; private String data; private byte[] dataBytes; public String format() { - String now = LocalDateTime.now().format(dateTimeFormatter); + String reportTime = LocalDateTime.now().format(dateTimeFormatter); StringJoiner joiner = new StringJoiner(delimiter); String formatData = null; if (data != null) { @@ -50,14 +56,19 @@ public class DirtyMessageWrapper { formatData = Base64.getEncoder().encodeToString(dataBytes); } - return joiner.add(inlongGroupId) - .add(inlongStreamId) - .add(now) - .add(dataTime) + String dataTimeStr = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), + ZoneId.systemDefault()).format(dateTimeFormatter); + return joiner .add(dataflowId) + .add(inlongGroupId) + .add(inlongStreamId) + .add(reportTime) + .add(dataTimeStr) .add(serverType) .add(dirtyType) - .add(ext) - .add(formatData).toString(); + .add(StringEscapeUtils.escapeXSI(dirtyMessage)) + .add(StringEscapeUtils.escapeXSI(ext)) + .add(StringEscapeUtils.escapeXSI(formatData)) + .toString(); } } diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java similarity index 86% rename from inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java rename to inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 2240ebdb6c..88e2e88a74 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -18,7 +18,6 @@ package org.apache.inlong.sdk.dirtydata; import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.MessageSender; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; @@ -28,19 +27,22 @@ import com.google.common.base.Preconditions; import lombok.Builder; import lombok.extern.slf4j.Slf4j; +import java.net.InetAddress; + @Slf4j @Builder -public class InlongSdkDirtySink { +public class InlongSdkDirtySender { private String inlongGroupId; private String inlongStreamId; private String inlongManagerAddr; + private int inlongManagerPort; private String authId; private String authKey; private boolean ignoreErrors; private SendMessageCallback callback; - private MessageSender sender; + private DefaultMessageSender sender; public void init() throws Exception { Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be null"); @@ -51,8 +53,11 @@ public class InlongSdkDirtySink { this.callback = new LogCallBack(); ProxyClientConfig proxyClientConfig = - new ProxyClientConfig(inlongManagerAddr, inlongGroupId, authId, authKey); + new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true, + inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey); + proxyClientConfig.setReadProxyIPFromLocal(false); this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + this.sender.setMsgtype(7); log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); } @@ -61,6 +66,12 @@ public class InlongSdkDirtySink { sender.asyncSendMessage(inlongGroupId, inlongStreamId, messageWrapper.format().getBytes(), callback); } + public void close() { + if (sender != null) { + sender.close(); + } + } + class LogCallBack implements SendMessageCallback { @Override diff --git a/inlong-sort/sort-flink/base/pom.xml b/inlong-sort/sort-flink/base/pom.xml index 19c36af5fc..fd75d483d9 100644 --- a/inlong-sort/sort-flink/base/pom.xml +++ b/inlong-sort/sort-flink/base/pom.xml @@ -38,6 +38,11 @@ <artifactId>audit-sdk</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>dirty-data-sdk</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index c13c47f66e..7e02b0ca96 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -385,7 +385,7 @@ public final class Constants { public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER = ConfigOptions.key("dirty.side-output.field-delimiter") .stringType() - .defaultValue(",") + .defaultValue("|") .withDescription("The field-delimiter of dirty side-output"); public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER = ConfigOptions.key("dirty.side-output.line-delimiter") diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java index 542e9e1977..1ac2b60a1d 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java @@ -71,6 +71,14 @@ public class DirtyData<T> { * The row type of data, it is only used for 'RowData' */ private @Nullable final LogicalType rowType; + /** + * Dirty message data time + */ + private final long dataTime; + /** + * Dirty message ext params + */ + private @Nullable final String extParams; /** * The real dirty data */ @@ -78,7 +86,7 @@ public class DirtyData<T> { public DirtyData(T data, String identifier, String labels, String logTag, DirtyType dirtyType, String dirtyMessage, - @Nullable LogicalType rowType) { + @Nullable LogicalType rowType, long dataTime, String extParams) { this.data = data; this.dirtyType = dirtyType; this.dirtyMessage = dirtyMessage; @@ -87,7 +95,8 @@ public class DirtyData<T> { this.labels = PatternReplaceUtils.replace(labels, paramMap); this.logTag = PatternReplaceUtils.replace(logTag, paramMap); this.identifier = PatternReplaceUtils.replace(identifier, paramMap); - + this.dataTime = dataTime == 0 ? System.currentTimeMillis() : dataTime; + this.extParams = extParams; } public static <T> Builder<T> builder() { @@ -122,6 +131,18 @@ public class DirtyData<T> { return identifier; } + public long getDataTime() { + return dataTime; + } + + public String getExtParams() { + return extParams; + } + + public String getDirtyMessage() { + return dirtyMessage; + } + @Nullable public LogicalType getRowType() { return rowType; @@ -135,8 +156,20 @@ public class DirtyData<T> { private DirtyType dirtyType = DirtyType.UNDEFINED; private String dirtyMessage; private LogicalType rowType; + private long dataTime; + private String extParams; private T data; + public Builder<T> setDirtyDataTime(long dataTime) { + this.dataTime = dataTime; + return this; + } + + public Builder<T> setExtParams(String extParams) { + this.extParams = extParams; + return this; + } + public Builder<T> setDirtyType(DirtyType dirtyType) { this.dirtyType = dirtyType; return this; @@ -173,7 +206,8 @@ public class DirtyData<T> { } public DirtyData<T> build() { - return new DirtyData<>(data, identifier, labels, logTag, dirtyType, dirtyMessage, rowType); + return new DirtyData<>(data, identifier, labels, logTag, dirtyType, + dirtyMessage, rowType, dataTime, extParams); } } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java similarity index 91% rename from inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java rename to inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java index 0692d78580..5cb03f0f80 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java @@ -26,7 +26,7 @@ import java.io.Serializable; @Data @Builder @Getter -public class InlongSdkOptions implements Serializable { +public class InlongSdkDirtyOptions implements Serializable { private static final String DEFAULT_FORMAT = "csv"; @@ -36,9 +36,10 @@ public class InlongSdkOptions implements Serializable { private static final String DEFAULT_KV_FIELD_DELIMITER = "&"; private static final String DEFAULT_KV_ENTRY_DELIMITER = "="; - private String inlongGroupId; - private String inlongStreamId; + private String sendToGroupId; + private String sendToStreamId; private String inlongManagerAddr; + private int inlongManagerPort; private String inlongManagerAuthKey; private String inlongManagerAuthId; private String format = DEFAULT_FORMAT; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8a9d407a4b..4441cca830 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -17,13 +17,9 @@ package org.apache.inlong.sort.base.dirty.sink.sdk; -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.MessageSender; -import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dirtydata.DirtyMessageWrapper; +import org.apache.inlong.sdk.dirtydata.InlongSdkDirtySender; import org.apache.inlong.sort.base.dirty.DirtyData; -import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.dirty.utils.FormatUtils; import org.apache.inlong.sort.base.util.LabelUtils; @@ -37,46 +33,54 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.Map; -import java.util.StringJoiner; @Slf4j public class InlongSdkDirtySink<T> implements DirtySink<T> { - private final InlongSdkOptions options; + private final InlongSdkDirtyOptions options; private final DataType physicalRowDataType; - private final String inlongGroupId; - private final String inlongStreamId; - private final SendMessageCallback callback; - private transient DateTimeFormatter dateTimeFormatter; private transient RowData.FieldGetter[] fieldGetters; private transient RowDataToJsonConverters.RowDataToJsonConverter converter; - private transient MessageSender sender; + private transient InlongSdkDirtySender dirtySender; - public InlongSdkDirtySink(InlongSdkOptions options, DataType physicalRowDataType) { + public InlongSdkDirtySink(InlongSdkDirtyOptions options, DataType physicalRowDataType) { this.options = options; this.physicalRowDataType = physicalRowDataType; - this.inlongGroupId = options.getInlongGroupId(); - this.inlongStreamId = options.getInlongStreamId(); - this.callback = new LogCallBack(); } @Override public void invoke(DirtyData<T> dirtyData) throws Exception { try { Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); - String groupId = Preconditions.checkNotNull(labelMap.get("groupId")); - String streamId = Preconditions.checkNotNull(labelMap.get("streamId")); - - String message = join(groupId, streamId, - dirtyData.getDirtyType(), dirtyData.getLabels(), formatData(dirtyData, labelMap)); - sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback); + String dataGroupId = Preconditions.checkNotNull(labelMap.get("groupId")); + String dataStreamId = Preconditions.checkNotNull(labelMap.get("streamId")); + String serverType = Preconditions.checkNotNull(labelMap.get("serverType")); + String dataflowId = Preconditions.checkNotNull(labelMap.get("dataflowId")); + + String dirtyMessage = formatData(dirtyData, labelMap); + + DirtyMessageWrapper wrapper = DirtyMessageWrapper.builder() + .delimiter(options.getCsvFieldDelimiter()) + .inlongGroupId(dataGroupId) + .inlongStreamId(dataStreamId) + .dataflowId(dataflowId) + .dataTime(dirtyData.getDataTime()) + .serverType(serverType) + .dirtyType(dirtyData.getDirtyType().format()) + .dirtyMessage(dirtyData.getDirtyMessage()) + .ext(dirtyData.getExtParams()) + .data(dirtyMessage) + .build(); + + dirtySender.sendDirtyMessage(wrapper); } catch (Throwable t) { log.error("failed to send dirty message to inlong sdk", t); + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("failed to send dirty message to inlong sdk", t); + } } } @@ -84,39 +88,28 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> { public void open(Configuration configuration) throws Exception { converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType()); fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType()); - dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + log.info("inlong sdk dirty options={}", options); // init sender - ProxyClientConfig proxyClientConfig = - new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(), - options.getInlongManagerAuthId(), options.getInlongManagerAuthKey()); - sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + dirtySender = InlongSdkDirtySender.builder() + .inlongManagerAddr(options.getInlongManagerAddr()) + .inlongManagerPort(options.getInlongManagerPort()) + .authId(options.getInlongManagerAuthId()) + .authKey(options.getInlongManagerAuthKey()) + .ignoreErrors(options.isIgnoreSideOutputErrors()) + .inlongGroupId(options.getSendToGroupId()) + .inlongStreamId(options.getSendToStreamId()) + .build(); + dirtySender.init(); } @Override public void close() throws Exception { - if (sender != null) { - sender.close(); + if (dirtySender != null) { + dirtySender.close(); } } - private String join( - String inlongGroup, - String inlongStream, - DirtyType type, - String label, - String formattedData) { - - String now = LocalDateTime.now().format(dateTimeFormatter); - - StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter()); - return joiner.add(inlongGroup + "." + inlongStream) - .add(now) - .add(type.name()) - .add(label) - .add(formattedData).toString(); - } - private String formatData(DirtyData<T> dirtyData, Map<String, String> labels) throws JsonProcessingException { String value; T data = dirtyData.getData(); @@ -158,28 +151,4 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> { } return value; } - - class LogCallBack implements SendMessageCallback { - - @Override - public void onMessageAck(SendResult result) { - if (result == SendResult.OK) { - return; - } - log.error("failed to send inlong dirty message, response={}", result); - - if (!options.isIgnoreSideOutputErrors()) { - throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); - } - } - - @Override - public void onException(Throwable e) { - log.error("failed to send inlong dirty message", e); - - if (!options.isIgnoreSideOutputErrors()) { - throw new RuntimeException("writing dirty message to inlong sdk failed", e); - } - } - } } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java index 000836b667..8d7e399c5c 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java @@ -20,32 +20,39 @@ package org.apache.inlong.sort.base.dirty.sink.sdk; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; import java.util.HashSet; import java.util.Set; -import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE; +@Slf4j public class InlongSdkDirtySinkFactory implements DirtySinkFactory { private static final String IDENTIFIER = "inlong-sdk"; - private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER = + private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr") .stringType() .noDefaultValue() .withDescription("The inlong manager addr to init inlong sdk"); + private static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT = + ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-port") + .intType() + .defaultValue(8083) + .withDescription("The inlong manager port to init inlong sdk"); + private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id") .stringType() @@ -74,24 +81,18 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory { public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context) { ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); FactoryUtil.validateFactoryOptions(this, config); - validate(config); - return new InlongSdkDirtySink<>(getOptions(config), + InlongSdkDirtyOptions options = getOptions(config); + return new InlongSdkDirtySink<>(options, context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); } - private void validate(ReadableConfig config) { - String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null); - if (identifier == null || identifier.trim().isEmpty()) { - throw new ValidationException( - "The option 'dirty.identifier' is not allowed to be empty."); - } - } - - private InlongSdkOptions getOptions(ReadableConfig config) { - return InlongSdkOptions.builder() - .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER)) - .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) - .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) + private InlongSdkDirtyOptions getOptions(ReadableConfig config) { + return InlongSdkDirtyOptions.builder() + .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR)) + .inlongManagerPort(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT)) + .sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) + .sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) + .csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER)) .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) .ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true)) @@ -107,7 +108,7 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory { @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER); + options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR); options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID); options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY); options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index f863d1cc30..b066347baa 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -62,6 +62,7 @@ <groupId>org.apache.inlong</groupId> <artifactId>sort-common</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> </dependencies> @@ -86,6 +87,10 @@ <includes> <include>org.apache.inlong:*</include> <include>com.fasterxml.*:*</include> + <include>com.google.protobuf:*</include> + <include>org.apache.commons:*</include> + <include>commons-collections:*</include> + <include>commons-codec:*</include> </includes> </artifactSet> @@ -111,6 +116,10 @@ <pattern>org.apache.inlong.sort.base</pattern> <shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.inlong.sort.base</shadedPattern> </relocation> + <relocation> + <pattern>org.apache.commons</pattern> + <shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.commons</shadedPattern> + </relocation> </relocations> </configuration> </execution> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index 80532a2dc1..822396c9cb 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -29,7 +29,7 @@ import java.io.Serializable; public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { @PublicEvolving - default void open() { + default void open() throws Exception { } /** diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index 5d4a3bd2c6..a178aa57a8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -17,26 +17,36 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.inlong.tubemq.corebase.Message; import com.google.common.base.Objects; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; import java.io.IOException; import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +@Slf4j public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDeserializationSchema<RowData> { /** @@ -65,26 +75,37 @@ public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDes private final MetricOption metricOption; + private final DirtySink<byte[]> dirtySink; + + private final DirtyOptions dirtyOptions; + public DynamicTubeMQTableDeserializationSchema( DeserializationSchema<RowData> schema, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, boolean ignoreErrors, boolean innerFormat, - MetricOption metricOption) { + MetricOption metricOption, + DirtySink<byte[]> dirtySink, + DirtyOptions dirtyOptions) { this.deserializationSchema = schema; this.metadataConverters = metadataConverters; this.producedTypeInfo = producedTypeInfo; this.ignoreErrors = ignoreErrors; this.innerFormat = innerFormat; this.metricOption = metricOption; + this.dirtySink = dirtySink; + this.dirtyOptions = dirtyOptions; } @Override - public void open() { + public void open() throws Exception { if (metricOption != null) { sourceExactlyMetric = new SourceExactlyMetric(metricOption); } + if (dirtySink != null) { + dirtySink.open(new Configuration()); + } } @Override @@ -103,7 +124,41 @@ public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDes if (!innerFormat) { metricsCollector.resetTimestamp(System.currentTimeMillis()); } - deserializationSchema.deserialize(message.getData(), metricsCollector); + + if (!dirtyOptions.ignoreDirty()) { + deserializationSchema.deserialize(message.getData(), metricsCollector); + } else { + try { + deserializationSchema.deserialize(message.getData(), metricsCollector); + } catch (Throwable t) { + if (dirtySink != null) { + DirtyData.Builder<byte[]> builder = DirtyData.builder(); + try { + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); + long dataTime = LocalDateTime.parse(message.getMsgTime(), formatter) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + builder.setData(message.getData()) + .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR) + .setDirtyDataTime(dataTime) + .setExtParams(message.getAttribute()) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setDirtyMessage(t.getMessage()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new IOException(ex); + } + log.warn("Dirty sink failed", ex); + } + } + } + } rows.forEach(row -> emitRow(message, (GenericRowData) row, out)); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 4962364a9c..22c3306064 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -48,22 +51,13 @@ import java.util.Set; import java.util.TreeSet; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; -import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; -import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; -import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN; -import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties; +import static org.apache.inlong.sort.base.Constants.*; +import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.*; /** * A dynamic table factory implementation for TubeMQ. */ + public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final String IDENTIFIER = "tubemq-inlong"; @@ -120,10 +114,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); // validate all options - helper.validateExcept(ExtractNode.INLONG_MSG); + helper.validateExcept(ExtractNode.INLONG_MSG, PROPERTIES_PREFIX, DIRTY_PREFIX); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); - innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT)); + innerFormat = tableOptions.get(FORMAT).contains(ExtractNode.INLONG_MSG); final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions()); @@ -133,6 +127,9 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); String auditKeys = tableOptions.get(AUDIT_KEYS); + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink<byte[]> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); + return createTubeMQTableSource( physicalDataType, valueDecodingFormat, @@ -144,7 +141,9 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn properties, inlongMetric, auditHostAndPorts, - auditKeys); + auditKeys, + dirtySink, + dirtyOptions); } @Override @@ -184,7 +183,9 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn Configuration properties, String inlongMetric, String auditHostAndPorts, - String auditKeys) { + String auditKeys, + DirtySink<byte[]> dirtySink, + DirtyOptions dirtyOptions) { return new TubeMQTableSource( physicalDataType, valueDecodingFormat, @@ -200,7 +201,9 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn innerFormat, inlongMetric, auditHostAndPorts, - auditKeys); + auditKeys, + dirtySink, + dirtyOptions); } protected TubeMQTableSink createTubeMQTableSink( diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java index 2d5ddbb3d5..d69fcf985e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer; @@ -131,6 +133,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada private String auditHostAndPorts; private String auditKeys; + private DirtySink<byte[]> dirtySink; + private DirtyOptions dirtyOptions; + /** * Watermark strategy that is used to generate per-partition watermark. */ @@ -143,7 +148,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada TreeSet<String> streamIdSet, String consumerGroup, String sessionKey, Configuration configuration, @Nullable WatermarkStrategy<RowData> watermarkStrategy, Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat, - String inlongMetric, String auditHostAndPorts, String auditKeys) { + String inlongMetric, String auditHostAndPorts, String auditKeys, + DirtySink<byte[]> dirtySink, DirtyOptions dirtyOptions) { Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null."); Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null."); @@ -170,6 +176,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; this.auditKeys = auditKeys; + this.dirtySink = dirtySink; + this.dirtyOptions = dirtyOptions; } @Override @@ -200,7 +208,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada physicalDataType, valueDecodingFormat, masterAddress, topic, streamIdSet, consumerGroup, sessionKey, configuration, watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat, - inlongMetric, auditHostAndPorts, auditKeys); + inlongMetric, auditHostAndPorts, auditKeys, dirtySink, dirtyOptions); } @Override @@ -325,7 +333,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer = new DynamicTubeMQTableDeserializationSchema( - deserialization, metadataConverters, producedTypeInfo, ignoreErrors, innerFormat, metricOption); + deserialization, metadataConverters, producedTypeInfo, ignoreErrors, + innerFormat, metricOption, dirtySink, dirtyOptions); final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet, consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat);