This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c6aabe607 [INLONG-10729][Sort] Sorstandalone EsSink support transform 
(#10734)
5c6aabe607 is described below

commit 5c6aabe607c6a67f895f402de5abed42f1f97afd
Author: vernedeng <verned...@apache.org>
AuthorDate: Sun Aug 4 10:08:59 2024 +0800

    [INLONG-10729][Sort] Sorstandalone EsSink support transform (#10734)
---
 .../sdk/transform/encode/MapSinkEncoder.java       | 74 ++++++++++++++++++++++
 .../sdk/transform/encode/SinkEncoderFactory.java   |  5 ++
 .../inlong/sdk/transform/pojo/CsvSinkInfo.java     |  2 +-
 .../inlong/sdk/transform/pojo/FieldInfo.java       | 12 ++++
 .../inlong/sdk/transform/pojo/KvSinkInfo.java      |  2 +-
 .../pojo/{KvSinkInfo.java => MapSinkInfo.java}     | 39 ++----------
 .../apache/inlong/sdk/transform/pojo/SinkInfo.java | 10 ++-
 .../inlong/sdk/transform/pojo/TransformConfig.java |  3 +-
 .../transform/process/converter/TypeConverter.java | 25 ++++----
 .../inlong/sort/standalone/sink/SinkContext.java   | 30 ++++++---
 .../DefaultEvent2IndexRequestHandler.java          | 31 +++++++++
 .../sink/elasticsearch/EsChannelWorker.java        | 31 ++++++---
 .../standalone/sink/elasticsearch/EsIdConfig.java  | 14 ++++
 .../sink/elasticsearch/EsSinkContext.java          | 68 ++++++++++++++++++++
 .../elasticsearch/IEvent2IndexRequestHandler.java  |  9 +++
 .../sink/elasticsearch/TestEsSinkContext.java      |  2 +
 16 files changed, 292 insertions(+), 65 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
new file mode 100644
index 0000000000..139bfa43a3
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> {
+
+    private final MapSinkInfo sinkInfo;
+    private final Map<String, TypeConverter> converters;
+
+    public MapSinkEncoder(MapSinkInfo sinkInfo) {
+        this.sinkInfo = sinkInfo;
+        this.converters = sinkInfo.getFields()
+                .stream()
+                .collect(Collectors.toMap(FieldInfo::getName,
+                        info -> info.getConverter() == null ? 
TypeConverter.DefaultTypeConverter()
+                                : info.getConverter()));
+    }
+
+    @Override
+    public Map<String, Object> encode(SinkData sinkData, Context context) {
+        Map<String, Object> esMap = new HashMap<>();
+        for (FieldInfo fieldInfo : sinkInfo.getFields()) {
+            String fieldName = fieldInfo.getName();
+            String strValue = sinkData.getField(fieldName);
+            TypeConverter converter = converters.get(fieldName);
+            if (converter == null) {
+                esMap.put(fieldName, strValue);
+                continue;
+            }
+
+            try {
+                esMap.put(fieldName, converter.convert(strValue));
+            } catch (Throwable t) {
+                log.warn("failed to serialize field ={}, value={}", fieldName, 
strValue, t);
+                esMap.put(fieldName, null);
+            }
+        }
+
+        return esMap;
+    }
+
+    @Override
+    public List<FieldInfo> getFields() {
+        return sinkInfo.getFields();
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index f95d19bfca..30619078ac 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
 
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
 
 public class SinkEncoderFactory {
 
@@ -29,4 +30,8 @@ public class SinkEncoderFactory {
     public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
         return new KvSinkEncoder(kvSinkInfo);
     }
+
+    public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
+        return new MapSinkEncoder(mapSinkInfo);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
index 29c80c3f90..063552184b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
@@ -40,7 +40,7 @@ public class CsvSinkInfo extends SinkInfo {
             @JsonProperty("delimiter") Character delimiter,
             @JsonProperty("escapeChar") Character escapeChar,
             @JsonProperty("fields") List<FieldInfo> fields) {
-        super(SourceInfo.CSV, charset);
+        super(SinkInfo.CSV, charset);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
         if (fields != null) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 46106e534f..1027dad944 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.pojo;
 
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
 import lombok.Data;
 
 /**
@@ -26,4 +28,14 @@ import lombok.Data;
 public class FieldInfo {
 
     private String name;
+    private TypeConverter converter;
+
+    public FieldInfo() {
+
+    }
+
+    public FieldInfo(String name, TypeConverter converter) {
+        this.name = name;
+        this.converter = converter;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
index 49ff98e599..02111ab852 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
@@ -42,7 +42,7 @@ public class KvSinkInfo extends SinkInfo {
     public KvSinkInfo(
             @JsonProperty("charset") String charset,
             @JsonProperty("fields") List<FieldInfo> fields) {
-        super(SourceInfo.KV, charset);
+        super(SinkInfo.KV, charset);
         if (fields != null) {
             this.fields = fields;
         } else {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
similarity index 64%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
index 49ff98e599..d85347f6ea 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java
@@ -17,53 +17,28 @@
 
 package org.apache.inlong.sdk.transform.pojo;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Data;
 import lombok.experimental.SuperBuilder;
+import org.apache.commons.collections.CollectionUtils;
 
-import java.util.ArrayList;
 import java.util.List;
 
-/**
- * KvSinkInfo
- */
 @JsonIgnoreProperties(ignoreUnknown = true)
-@Data
 @SuperBuilder
-public class KvSinkInfo extends SinkInfo {
+@Data
+public class MapSinkInfo extends SinkInfo {
 
-    private Character kvDelimiter;
-    private Character entryDelimiter;
     private List<FieldInfo> fields;
 
-    @JsonCreator
-    public KvSinkInfo(
+    public MapSinkInfo(
             @JsonProperty("charset") String charset,
             @JsonProperty("fields") List<FieldInfo> fields) {
-        super(SourceInfo.KV, charset);
-        if (fields != null) {
-            this.fields = fields;
-        } else {
-            this.fields = new ArrayList<>();
+        super(SinkInfo.ES_MAP, charset);
+        if (CollectionUtils.isEmpty(fields)) {
+            throw new IllegalArgumentException("failed to init map sink info, 
fieldInfos is empty");
         }
-    }
-
-    /**
-     * get fields
-     * @return the fields
-     */
-    @JsonProperty("fields")
-    public List<FieldInfo> getFields() {
-        return fields;
-    }
-
-    /**
-     * set fields
-     * @param fields the fields to set
-     */
-    public void setFields(List<FieldInfo> fields) {
         this.fields = fields;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index d7d029ab50..9c61c6b46c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import lombok.Data;
 import lombok.experimental.SuperBuilder;
 
 import java.util.Optional;
@@ -33,12 +34,17 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
-        @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
-        @Type(value = KvSinkInfo.class, name = SourceInfo.KV),
+        @Type(value = CsvSinkInfo.class, name = SinkInfo.CSV),
+        @Type(value = KvSinkInfo.class, name = SinkInfo.KV),
 })
 @SuperBuilder
+@Data
 public abstract class SinkInfo {
 
+    public static final String CSV = "csv";
+    public static final String KV = "kv";
+    public static final String ES_MAP = "es_map";
+
     @JsonIgnore
     private String type;
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index b73f303233..2ce813ec03 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.pojo;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Map;
@@ -42,7 +43,7 @@ public class TransformConfig {
     @JsonCreator
     public TransformConfig(@JsonProperty("transformSql") String transformSql,
             @JsonProperty("configuration") Map<String, Object> configuration) {
-        this.transformSql = transformSql;
+        this.transformSql = Preconditions.checkNotNull(transformSql, 
"transform sql should not be null");
         this.configuration = configuration;
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
similarity index 65%
copy from 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
index 03677557f6..455156a5cc 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.sink.elasticsearch;
-
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+package org.apache.inlong.sdk.transform.process.converter;
 
 /**
- * 
- * IEvent2IndexRequestHandler
+ * Converter to convert the transform intermediate string value to the given 
data type
  */
-public interface IEvent2IndexRequestHandler {
+public interface TypeConverter {
 
     /**
-     * parse
-     * 
-     * @param  context
-     * @param  event
-     * @return
+     *
+     * @param value String source value
+     * @return Converted type value
+     * @throws Exception Convert exception
      */
-    EsIndexRequest parse(EsSinkContext context, ProfileEvent event);
+    Object convert(String value) throws Exception;
+
+    static TypeConverter DefaultTypeConverter() {
+        return value -> value;
+    }
+
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index af2892abc4..9b5e4bc4dd 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -25,6 +25,8 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sdk.transform.decode.SourceDecoder;
 import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
@@ -32,7 +34,7 @@ import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
-import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -42,13 +44,14 @@ import 
org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
@@ -69,7 +72,6 @@ public class SinkContext {
     protected final String sinkName;
     protected final Context sinkContext;
     protected TaskConfig taskConfig;
-    protected Map<String, TransformProcessor<String, String>> transformMap;
     @Deprecated
     protected SortTaskConfig sortTaskConfig;
     protected final Channel channel;
@@ -91,7 +93,6 @@ public class SinkContext {
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
         this.metricItemSet = new SortMetricItemSet(sinkName);
         this.unifiedConfiguration = 
CommonPropertiesHolder.useUnifiedConfiguration();
-        this.transformMap = Maps.newConcurrentMap();
         MetricRegister.register(this.metricItemSet);
     }
 
@@ -197,7 +198,14 @@ public class SinkContext {
     }
 
     public TransformConfig createTransformConfig(DataFlowConfig 
dataFlowConfig) {
-        return new TransformConfig(dataFlowConfig.getTransformSql());
+        return new TransformConfig(dataFlowConfig.getTransformSql(), 
globalConfiguration());
+    }
+
+    public Map<String, Object> globalConfiguration() {
+        Map<String, Object> globalConfiguration = new HashMap<>();
+        globalConfiguration.putAll(CommonPropertiesHolder.get());
+        globalConfiguration.putAll(sinkContext.getParameters());
+        return ImmutableMap.copyOf(globalConfiguration);
     }
 
     public SourceDecoder<String> createSourceDecoder(SourceConfig 
sourceConfig) {
@@ -234,8 +242,14 @@ public class SinkContext {
     }
 
     public FieldInfo convertToTransformFieldInfo(FieldConfig config) {
-        FieldInfo fieldInfo = new FieldInfo();
-        fieldInfo.setName(config.getName());
-        return fieldInfo;
+        return new FieldInfo(config.getName(), 
deriveTypeConverter(config.getFormatInfo()));
+    }
+
+    public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {
+
+        if (formatInfo instanceof BasicFormatInfo) {
+            return value -> ((BasicFormatInfo<?>) 
formatInfo).deserialize(value);
+        }
+        return value -> value;
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index a22ddc7990..d67b60beca 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -18,9 +18,12 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -28,11 +31,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * 
  * DefaultEvent2IndexRequestHandler
  */
+@Slf4j
 public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHandler {
 
     public static final String KEY_EXTINFO = "extinfo";
@@ -105,6 +110,32 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
         return indexRequest;
     }
 
+    @Override
+    public List<EsIndexRequest> parse(
+            EsSinkContext context,
+            ProfileEvent event,
+            TransformProcessor<String, Map<String, Object>> processor) {
+        if (processor == null) {
+            log.error("find no any transform processor for es sink");
+            return null;
+        }
+
+        String uid = event.getUid();
+        EsIdConfig idConfig = context.getIdConfig(uid);
+        String indexName = idConfig.parseIndexName(event.getRawLogTime());
+        byte[] bodyBytes = event.getBody();
+        String strContext = new String(bodyBytes, idConfig.getCharset());
+        // build
+        List<Map<String, Object>> esData = processor.transform(strContext);
+        return esData.stream()
+                .map(data -> {
+                    EsIndexRequest indexRequest = new 
EsIndexRequest(indexName, event);
+                    indexRequest.source(data);
+                    return indexRequest;
+                })
+                .collect(Collectors.toList());
+    }
+
     /**
      * getExtInfo
      * 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index c67edd0d44..49908b2326 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
@@ -26,6 +28,8 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * EsChannelWorker
  */
@@ -90,16 +94,27 @@ public class EsChannelWorker extends Thread {
             }
             // to profileEvent
             ProfileEvent profileEvent = (ProfileEvent) event;
-            EsIndexRequest indexRequest = handler.parse(context, profileEvent);
-            // offer queue
-            if (indexRequest != null) {
-                context.offerDispatchQueue(indexRequest);
+            if (!CommonPropertiesHolder.useUnifiedConfiguration()) {
+                EsIndexRequest indexRequest = handler.parse(context, 
profileEvent);
+                // offer queue
+                if (indexRequest != null) {
+                    context.offerDispatchQueue(indexRequest);
+                } else {
+                    context.addSendFailMetric();
+                    profileEvent.ack();
+                }
+                tx.commit();
             } else {
-                context.addSendFailMetric();
-                profileEvent.ack();
+                List<EsIndexRequest> indexRequestList = handler.parse(
+                        context, profileEvent, 
context.getTransformProcessor(profileEvent.getUid()));
+                if (CollectionUtils.isNotEmpty(indexRequestList)) {
+                    indexRequestList.forEach(context::offerDispatchQueue);
+                } else {
+                    context.addSendFailMetric();
+                    profileEvent.ack();
+                }
             }
-            tx.commit();
-            return;
+
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);
             try {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index cc98ab57d6..3136a49276 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -27,7 +27,9 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
+import lombok.extern.slf4j.Slf4j;
 
+import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
@@ -38,6 +40,7 @@ import java.util.stream.Collectors;
 @NoArgsConstructor
 @AllArgsConstructor
 @SuperBuilder
+@Slf4j
 public class EsIdConfig extends IdConfig {
 
     public static final String PATTERN_DAY = "{yyyyMMdd}";
@@ -71,6 +74,7 @@ public class EsIdConfig extends IdConfig {
     private int fieldOffset = 2; // for ftime,extinfo
     private int contentOffset = 0;// except for boss + tab(1)
     private List<String> fieldList;
+    private Charset charset;
 
     public static EsIdConfig create(DataFlowConfig dataFlowConfig) {
         EsSinkConfig sinkConfig = (EsSinkConfig) 
dataFlowConfig.getSinkConfig();
@@ -78,6 +82,15 @@ public class EsIdConfig extends IdConfig {
                 .stream()
                 .map(FieldConfig::getName)
                 .collect(Collectors.toList());
+        Charset charset;
+        try {
+            charset = Charset.forName(sinkConfig.getEncodingType());
+        } catch (Throwable t) {
+            log.warn("do not support encoding type={}, dataflow id={}",
+                    sinkConfig.getEncodingType(), 
dataFlowConfig.getDataflowId());
+            charset = Charset.defaultCharset();
+        }
+
         return EsIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
@@ -86,6 +99,7 @@ public class EsIdConfig extends IdConfig {
                 .separator(sinkConfig.getSeparator())
                 .indexNamePattern(sinkConfig.getIndexNamePattern())
                 .fieldList(fields)
+                .charset(charset)
                 .build();
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index f1f0cccf88..9c9664d775 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -37,6 +45,8 @@ import 
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import lombok.Getter;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -119,6 +129,9 @@ public class EsSinkContext extends SinkContext {
     private String strHttpHosts;
     private HttpHost[] httpHosts;
 
+    @Getter
+    protected Map<String, TransformProcessor<String, Map<String, Object>>> 
transformMap;
+
     public EsSinkContext(String sinkName, Context context, Channel channel,
             BufferQueue<EsIndexRequest> dispatchQueue) {
         super(sinkName, context, channel);
@@ -155,9 +168,12 @@ public class EsSinkContext extends SinkContext {
 
             // change current config
             Map<String, EsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
+            Map<String, TransformProcessor<String, Map<String, Object>>> 
transformProcessor =
+                    reloadTransform(taskConfig);
             Map<String, EsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
             if (unifiedConfiguration) {
                 idConfigMap = fromTaskConfig;
+                transformMap = transformProcessor;
                 reloadClientsFromNodeConfig(esNodeConfig);
             } else {
                 idConfigMap = fromSortTaskConfig;
@@ -207,6 +223,54 @@ public class EsSinkContext extends SinkContext {
         return newIdConfigMap;
     }
 
+    private Map<String, TransformProcessor<String, Map<String, Object>>> 
reloadTransform(TaskConfig taskConfig) {
+        ImmutableMap.Builder<String, TransformProcessor<String, Map<String, 
Object>>> builder =
+                new ImmutableMap.Builder<>();
+
+        taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .forEach(flow -> {
+                    TransformProcessor<String, Map<String, Object>> 
transformProcessor =
+                            createTransform(flow);
+                    if (transformProcessor == null) {
+                        return;
+                    }
+                    builder.put(InlongId.generateUid(flow.getInlongGroupId(), 
flow.getInlongStreamId()),
+                            transformProcessor);
+                });
+
+        return builder.build();
+    }
+
+    private TransformProcessor<String, Map<String, Object>> 
createTransform(DataFlowConfig dataFlowConfig) {
+        try {
+            return TransformProcessor.create(
+                    createTransformConfig(dataFlowConfig),
+                    createSourceDecoder(dataFlowConfig.getSourceConfig()),
+                    createEsSinkEncoder(dataFlowConfig.getSinkConfig()));
+        } catch (Exception e) {
+            LOG.error("failed to reload transform of dataflow={}, ex={}", 
dataFlowConfig.getDataflowId(),
+                    e.getMessage());
+            return null;
+        }
+    }
+
+    private MapSinkEncoder createEsSinkEncoder(SinkConfig sinkConfig) {
+        if (!(sinkConfig instanceof EsSinkConfig)) {
+            throw new IllegalArgumentException("sinkInfo must be an instance 
of EsMapSinkInfo");
+        }
+        EsSinkConfig esSinkConfig = (EsSinkConfig) sinkConfig;
+        List<FieldInfo> fieldInfos = esSinkConfig.getFieldConfigs()
+                .stream()
+                .map(config -> new FieldInfo(config.getName(), 
deriveTypeConverter(config.getFormatInfo())))
+                .collect(Collectors.toList());
+
+        MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(), 
fieldInfos);
+        return SinkEncoderFactory.createMapEncoder(sinkInfo);
+    }
+
     private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) {
         Map<String, String> properties = esNodeConfig.getProperties();
         this.sinkContext = new Context(properties != null ? properties : new 
HashMap<>());
@@ -365,6 +429,10 @@ public class EsSinkContext extends SinkContext {
         return this.idConfigMap.get(uid);
     }
 
+    public TransformProcessor<String, Map<String, Object>> 
getTransformProcessor(String uid) {
+        return this.transformMap.get(uid);
+    }
+
     /**
      * get nodeId
      *
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
index 03677557f6..db2e4bb8ee 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java
@@ -17,8 +17,12 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * 
  * IEvent2IndexRequestHandler
@@ -33,4 +37,9 @@ public interface IEvent2IndexRequestHandler {
      * @return
      */
     EsIndexRequest parse(EsSinkContext context, ProfileEvent event);
+
+    List<EsIndexRequest> parse(
+            EsSinkContext context,
+            ProfileEvent event,
+            TransformProcessor<String, Map<String, Object>> 
transformProcessor);
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 4e00d3e690..83d91f92a4 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.Constants;
@@ -107,6 +108,7 @@ public class TestEsSinkContext {
      */
     @Test
     public void test() throws Exception {
+        SortConfigMetricReporter.init(CommonPropertiesHolder.get());
         BufferQueue<EsIndexRequest> dispatchQueue = 
SinkContext.createBufferQueue();
         EsSinkContext context = mock(dispatchQueue);
         assertEquals(10, context.getBulkSizeMb());


Reply via email to