This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 77ee98ac9c [INLONG-10791][Sort] Support inlong dirty sink (#10795)
77ee98ac9c is described below

commit 77ee98ac9c47017155230d756fb3511cb67ea2c1
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Aug 16 14:44:54 2024 +0800

    [INLONG-10791][Sort] Support inlong dirty sink (#10795)
---
 inlong-sort/sort-flink/base/pom.xml                |   7 +
 .../base/dirty/sink/sdk/InlongSdkDirtySink.java    | 185 +++++++++++++++++++++
 .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java  | 126 ++++++++++++++
 .../sort/base/dirty/sink/sdk/InlongSdkOptions.java |  51 ++++++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 5 files changed, 371 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-flink/base/pom.xml 
b/inlong-sort/sort-flink/base/pom.xml
index a98220f178..12b10fe19d 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -56,6 +56,13 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>dataproxy-sdk</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
new file mode 100644
index 0000000000..8a9d407a4b
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+import java.util.Map;
+import java.util.StringJoiner;
+
+@Slf4j
+public class InlongSdkDirtySink<T> implements DirtySink<T> {
+
+    private final InlongSdkOptions options;
+    private final DataType physicalRowDataType;
+    private final String inlongGroupId;
+    private final String inlongStreamId;
+    private final SendMessageCallback callback;
+
+    private transient DateTimeFormatter dateTimeFormatter;
+    private transient RowData.FieldGetter[] fieldGetters;
+    private transient RowDataToJsonConverters.RowDataToJsonConverter converter;
+    private transient MessageSender sender;
+
+    public InlongSdkDirtySink(InlongSdkOptions options, DataType 
physicalRowDataType) {
+        this.options = options;
+        this.physicalRowDataType = physicalRowDataType;
+        this.inlongGroupId = options.getInlongGroupId();
+        this.inlongStreamId = options.getInlongStreamId();
+        this.callback = new LogCallBack();
+    }
+
+    @Override
+    public void invoke(DirtyData<T> dirtyData) throws Exception {
+        try {
+            Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
+            String groupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
+            String streamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
+
+            String message = join(groupId, streamId,
+                    dirtyData.getDirtyType(), dirtyData.getLabels(), 
formatData(dirtyData, labelMap));
+            sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
message.getBytes(), callback);
+        } catch (Throwable t) {
+            log.error("failed to send dirty message to inlong sdk", t);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = 
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
+        fieldGetters = 
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
+        dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+        // init sender
+        ProxyClientConfig proxyClientConfig =
+                new ProxyClientConfig(options.getInlongManagerAddr(), 
options.getInlongGroupId(),
+                        options.getInlongManagerAuthId(), 
options.getInlongManagerAuthKey());
+        sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (sender != null) {
+            sender.close();
+        }
+    }
+
+    private String join(
+            String inlongGroup,
+            String inlongStream,
+            DirtyType type,
+            String label,
+            String formattedData) {
+
+        String now = LocalDateTime.now().format(dateTimeFormatter);
+
+        StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
+        return joiner.add(inlongGroup + "." + inlongStream)
+                .add(now)
+                .add(type.name())
+                .add(label)
+                .add(formattedData).toString();
+    }
+
+    private String formatData(DirtyData<T> dirtyData, Map<String, String> 
labels) throws JsonProcessingException {
+        String value;
+        T data = dirtyData.getData();
+        if (data instanceof RowData) {
+            value = formatRowData((RowData) data, dirtyData.getRowType(), 
labels);
+        } else if (data instanceof byte[]) {
+            value = formatBytes((byte[]) data);
+        } else {
+            value = data.toString();
+        }
+        return value;
+    }
+
+    private String formatBytes(byte[] data) {
+        return Base64.getEncoder().encodeToString(data);
+    }
+
+    private String formatRowData(RowData data, LogicalType rowType,
+            Map<String, String> labels) throws JsonProcessingException {
+        String value;
+        switch (options.getFormat()) {
+            case "csv":
+                RowData.FieldGetter[] getters = fieldGetters;
+                if (rowType != null) {
+                    getters = FormatUtils.parseFieldGetters(rowType);
+                }
+                value = FormatUtils.csvFormat(data, getters, null, 
options.getCsvFieldDelimiter());
+                break;
+            case "json":
+                RowDataToJsonConverters.RowDataToJsonConverter jsonConverter = 
converter;
+                if (rowType != null) {
+                    jsonConverter = 
FormatUtils.parseRowDataToJsonConverter(rowType);
+                }
+                value = FormatUtils.jsonFormat(data, jsonConverter, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", 
options.getFormat()));
+        }
+        return value;
+    }
+
+    class LogCallBack implements SendMessageCallback {
+
+        @Override
+        public void onMessageAck(SendResult result) {
+            if (result == SendResult.OK) {
+                return;
+            }
+            log.error("failed to send inlong dirty message, response={}", 
result);
+
+            if (!options.isIgnoreSideOutputErrors()) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            log.error("failed to send inlong dirty message", e);
+
+            if (!options.isIgnoreSideOutputErrors()) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
+            }
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
new file mode 100644
index 0000000000..000836b667
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+
+public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
+
+    private static final String IDENTIFIER = "inlong-sdk";
+
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER 
=
+            
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The inlong manager addr to init inlong 
sdk");
+
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID 
=
+            ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The inlong manager auth id to init 
inlong sdk");
+
+    private static final ConfigOption<String> 
DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY =
+            ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The inlong manager auth id to init 
inlong sdk");
+
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_GROUP =
+            ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-group-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The inlong group id of dirty sink");
+
+    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_STREAM =
+            ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-stream-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The inlong stream id of dirty sink");
+
+    @Override
+    public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context 
context) {
+        ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+        FactoryUtil.validateFactoryOptions(this, config);
+        validate(config);
+        return new InlongSdkDirtySink<>(getOptions(config),
+                
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+    }
+
+    private void validate(ReadableConfig config) {
+        String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
+        if (identifier == null || identifier.trim().isEmpty()) {
+            throw new ValidationException(
+                    "The option 'dirty.identifier' is not allowed to be 
empty.");
+        }
+    }
+
+    private InlongSdkOptions getOptions(ReadableConfig config) {
+        return InlongSdkOptions.builder()
+                
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER))
+                .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+                .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+                
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+                
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
+                
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
+                .enableDirtyLog(true)
+                .build();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER);
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID);
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY);
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP);
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_STREAM);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+        options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+        options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
new file mode 100644
index 0000000000..0692d78580
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@Getter
+public class InlongSdkOptions implements Serializable {
+
+    private static final String DEFAULT_FORMAT = "csv";
+
+    private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+    private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
+
+    private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
+    private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String inlongManagerAddr;
+    private String inlongManagerAuthKey;
+    private String inlongManagerAuthId;
+    private String format = DEFAULT_FORMAT;
+    private boolean ignoreSideOutputErrors;
+    private boolean enableDirtyLog;
+    private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+    private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
+    private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
+    private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a83e4d025c..f4effff253 100644
--- 
a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
-org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
\ No newline at end of file
+org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
+org.apache.inlong.sort.base.dirty.sink.sdk.InlongSdkDirtySinkFactory
\ No newline at end of file

Reply via email to