This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5c6aabe607 [INLONG-10729][Sort] Sorstandalone EsSink support transform (#10734) 5c6aabe607 is described below commit 5c6aabe607c6a67f895f402de5abed42f1f97afd Author: vernedeng <verned...@apache.org> AuthorDate: Sun Aug 4 10:08:59 2024 +0800 [INLONG-10729][Sort] Sorstandalone EsSink support transform (#10734) --- .../sdk/transform/encode/MapSinkEncoder.java | 74 ++++++++++++++++++++++ .../sdk/transform/encode/SinkEncoderFactory.java | 5 ++ .../inlong/sdk/transform/pojo/CsvSinkInfo.java | 2 +- .../inlong/sdk/transform/pojo/FieldInfo.java | 12 ++++ .../inlong/sdk/transform/pojo/KvSinkInfo.java | 2 +- .../pojo/{KvSinkInfo.java => MapSinkInfo.java} | 39 ++---------- .../apache/inlong/sdk/transform/pojo/SinkInfo.java | 10 ++- .../inlong/sdk/transform/pojo/TransformConfig.java | 3 +- .../transform/process/converter/TypeConverter.java | 25 ++++---- .../inlong/sort/standalone/sink/SinkContext.java | 30 ++++++--- .../DefaultEvent2IndexRequestHandler.java | 31 +++++++++ .../sink/elasticsearch/EsChannelWorker.java | 31 ++++++--- .../standalone/sink/elasticsearch/EsIdConfig.java | 14 ++++ .../sink/elasticsearch/EsSinkContext.java | 68 ++++++++++++++++++++ .../elasticsearch/IEvent2IndexRequestHandler.java | 9 +++ .../sink/elasticsearch/TestEsSinkContext.java | 2 + 16 files changed, 292 insertions(+), 65 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java new file mode 100644 index 0000000000..139bfa43a3 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.MapSinkInfo; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.converter.TypeConverter; + +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> { + + private final MapSinkInfo sinkInfo; + private final Map<String, TypeConverter> converters; + + public MapSinkEncoder(MapSinkInfo sinkInfo) { + this.sinkInfo = sinkInfo; + this.converters = sinkInfo.getFields() + .stream() + .collect(Collectors.toMap(FieldInfo::getName, + info -> info.getConverter() == null ? TypeConverter.DefaultTypeConverter() + : info.getConverter())); + } + + @Override + public Map<String, Object> encode(SinkData sinkData, Context context) { + Map<String, Object> esMap = new HashMap<>(); + for (FieldInfo fieldInfo : sinkInfo.getFields()) { + String fieldName = fieldInfo.getName(); + String strValue = sinkData.getField(fieldName); + TypeConverter converter = converters.get(fieldName); + if (converter == null) { + esMap.put(fieldName, strValue); + continue; + } + + try { + esMap.put(fieldName, converter.convert(strValue)); + } catch (Throwable t) { + log.warn("failed to serialize field ={}, value={}", fieldName, strValue, t); + esMap.put(fieldName, null); + } + } + + return esMap; + } + + @Override + public List<FieldInfo> getFields() { + return sinkInfo.getFields(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java index f95d19bfca..30619078ac 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode; import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.MapSinkInfo; public class SinkEncoderFactory { @@ -29,4 +30,8 @@ public class SinkEncoderFactory { public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) { return new KvSinkEncoder(kvSinkInfo); } + + public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) { + return new MapSinkEncoder(mapSinkInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java index 29c80c3f90..063552184b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java @@ -40,7 +40,7 @@ public class CsvSinkInfo extends SinkInfo { @JsonProperty("delimiter") Character delimiter, @JsonProperty("escapeChar") Character escapeChar, @JsonProperty("fields") List<FieldInfo> fields) { - super(SourceInfo.CSV, charset); + super(SinkInfo.CSV, charset); this.delimiter = delimiter; this.escapeChar = escapeChar; if (fields != null) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java index 46106e534f..1027dad944 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.transform.pojo; +import org.apache.inlong.sdk.transform.process.converter.TypeConverter; + import lombok.Data; /** @@ -26,4 +28,14 @@ import lombok.Data; public class FieldInfo { private String name; + private TypeConverter converter; + + public FieldInfo() { + + } + + public FieldInfo(String name, TypeConverter converter) { + this.name = name; + this.converter = converter; + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java index 49ff98e599..02111ab852 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java @@ -42,7 +42,7 @@ public class KvSinkInfo extends SinkInfo { public KvSinkInfo( @JsonProperty("charset") String charset, @JsonProperty("fields") List<FieldInfo> fields) { - super(SourceInfo.KV, charset); + super(SinkInfo.KV, charset); if (fields != null) { this.fields = fields; } else { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java similarity index 64% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java index 49ff98e599..d85347f6ea 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/MapSinkInfo.java @@ -17,53 +17,28 @@ package org.apache.inlong.sdk.transform.pojo; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import lombok.experimental.SuperBuilder; +import org.apache.commons.collections.CollectionUtils; -import java.util.ArrayList; import java.util.List; -/** - * KvSinkInfo - */ @JsonIgnoreProperties(ignoreUnknown = true) -@Data @SuperBuilder -public class KvSinkInfo extends SinkInfo { +@Data +public class MapSinkInfo extends SinkInfo { - private Character kvDelimiter; - private Character entryDelimiter; private List<FieldInfo> fields; - @JsonCreator - public KvSinkInfo( + public MapSinkInfo( @JsonProperty("charset") String charset, @JsonProperty("fields") List<FieldInfo> fields) { - super(SourceInfo.KV, charset); - if (fields != null) { - this.fields = fields; - } else { - this.fields = new ArrayList<>(); + super(SinkInfo.ES_MAP, charset); + if (CollectionUtils.isEmpty(fields)) { + throw new IllegalArgumentException("failed to init map sink info, fieldInfos is empty"); } - } - - /** - * get fields - * @return the fields - */ - @JsonProperty("fields") - public List<FieldInfo> getFields() { - return fields; - } - - /** - * set fields - * @param fields the fields to set - */ - public void setFields(List<FieldInfo> fields) { this.fields = fields; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java index d7d029ab50..9c61c6b46c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; import lombok.experimental.SuperBuilder; import java.util.Optional; @@ -33,12 +34,17 @@ import static com.google.common.base.Preconditions.checkNotNull; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") @JsonSubTypes({ - @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV), - @Type(value = KvSinkInfo.class, name = SourceInfo.KV), + @Type(value = CsvSinkInfo.class, name = SinkInfo.CSV), + @Type(value = KvSinkInfo.class, name = SinkInfo.KV), }) @SuperBuilder +@Data public abstract class SinkInfo { + public static final String CSV = "csv"; + public static final String KV = "kv"; + public static final String ES_MAP = "es_map"; + @JsonIgnore private String type; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java index b73f303233..2ce813ec03 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.pojo; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.util.Map; @@ -42,7 +43,7 @@ public class TransformConfig { @JsonCreator public TransformConfig(@JsonProperty("transformSql") String transformSql, @JsonProperty("configuration") Map<String, Object> configuration) { - this.transformSql = transformSql; + this.transformSql = Preconditions.checkNotNull(transformSql, "transform sql should not be null"); this.configuration = configuration; } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java similarity index 65% copy from inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java index 03677557f6..455156a5cc 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/TypeConverter.java @@ -15,22 +15,23 @@ * limitations under the License. */ -package org.apache.inlong.sort.standalone.sink.elasticsearch; - -import org.apache.inlong.sort.standalone.channel.ProfileEvent; +package org.apache.inlong.sdk.transform.process.converter; /** - * - * IEvent2IndexRequestHandler + * Converter to convert the transform intermediate string value to the given data type */ -public interface IEvent2IndexRequestHandler { +public interface TypeConverter { /** - * parse - * - * @param context - * @param event - * @return + * + * @param value String source value + * @return Converted type value + * @throws Exception Convert exception */ - EsIndexRequest parse(EsSinkContext context, ProfileEvent event); + Object convert(String value) throws Exception; + + static TypeConverter DefaultTypeConverter() { + return value -> value; + } + } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java index af2892abc4..9b5e4bc4dd 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java @@ -25,6 +25,8 @@ import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig; import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig; import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig; import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; import org.apache.inlong.sdk.transform.decode.SourceDecoder; import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; @@ -32,7 +34,7 @@ import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; -import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.inlong.sdk.transform.process.converter.TypeConverter; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; @@ -42,13 +44,14 @@ import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet; import org.apache.inlong.sort.standalone.utils.BufferQueue; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.slf4j.Logger; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Timer; @@ -69,7 +72,6 @@ public class SinkContext { protected final String sinkName; protected final Context sinkContext; protected TaskConfig taskConfig; - protected Map<String, TransformProcessor<String, String>> transformMap; @Deprecated protected SortTaskConfig sortTaskConfig; protected final Channel channel; @@ -91,7 +93,6 @@ public class SinkContext { this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L); this.metricItemSet = new SortMetricItemSet(sinkName); this.unifiedConfiguration = CommonPropertiesHolder.useUnifiedConfiguration(); - this.transformMap = Maps.newConcurrentMap(); MetricRegister.register(this.metricItemSet); } @@ -197,7 +198,14 @@ public class SinkContext { } public TransformConfig createTransformConfig(DataFlowConfig dataFlowConfig) { - return new TransformConfig(dataFlowConfig.getTransformSql()); + return new TransformConfig(dataFlowConfig.getTransformSql(), globalConfiguration()); + } + + public Map<String, Object> globalConfiguration() { + Map<String, Object> globalConfiguration = new HashMap<>(); + globalConfiguration.putAll(CommonPropertiesHolder.get()); + globalConfiguration.putAll(sinkContext.getParameters()); + return ImmutableMap.copyOf(globalConfiguration); } public SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig) { @@ -234,8 +242,14 @@ public class SinkContext { } public FieldInfo convertToTransformFieldInfo(FieldConfig config) { - FieldInfo fieldInfo = new FieldInfo(); - fieldInfo.setName(config.getName()); - return fieldInfo; + return new FieldInfo(config.getName(), deriveTypeConverter(config.getFormatInfo())); + } + + public TypeConverter deriveTypeConverter(FormatInfo formatInfo) { + + if (formatInfo instanceof BasicFormatInfo) { + return value -> ((BasicFormatInfo<?>) formatInfo).deserialize(value); + } + return value -> value; } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java index a22ddc7990..d67b60beca 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java @@ -18,9 +18,12 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import org.apache.inlong.sdk.commons.protocol.EventConstants; +import org.apache.inlong.sdk.transform.process.TransformProcessor; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.utils.UnescapeHelper; +import lombok.extern.slf4j.Slf4j; + import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; @@ -28,11 +31,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** * * DefaultEvent2IndexRequestHandler */ +@Slf4j public class DefaultEvent2IndexRequestHandler implements IEvent2IndexRequestHandler { public static final String KEY_EXTINFO = "extinfo"; @@ -105,6 +110,32 @@ public class DefaultEvent2IndexRequestHandler implements IEvent2IndexRequestHand return indexRequest; } + @Override + public List<EsIndexRequest> parse( + EsSinkContext context, + ProfileEvent event, + TransformProcessor<String, Map<String, Object>> processor) { + if (processor == null) { + log.error("find no any transform processor for es sink"); + return null; + } + + String uid = event.getUid(); + EsIdConfig idConfig = context.getIdConfig(uid); + String indexName = idConfig.parseIndexName(event.getRawLogTime()); + byte[] bodyBytes = event.getBody(); + String strContext = new String(bodyBytes, idConfig.getCharset()); + // build + List<Map<String, Object>> esData = processor.transform(strContext); + return esData.stream() + .map(data -> { + EsIndexRequest indexRequest = new EsIndexRequest(indexName, event); + indexRequest.source(data); + return indexRequest; + }) + .collect(Collectors.toList()); + } + /** * getExtInfo * diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java index c67edd0d44..49908b2326 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java @@ -18,7 +18,9 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.commons.collections.CollectionUtils; import org.apache.flume.Channel; import org.apache.flume.Event; import org.apache.flume.Transaction; @@ -26,6 +28,8 @@ import org.apache.flume.lifecycle.LifecycleState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** * EsChannelWorker */ @@ -90,16 +94,27 @@ public class EsChannelWorker extends Thread { } // to profileEvent ProfileEvent profileEvent = (ProfileEvent) event; - EsIndexRequest indexRequest = handler.parse(context, profileEvent); - // offer queue - if (indexRequest != null) { - context.offerDispatchQueue(indexRequest); + if (!CommonPropertiesHolder.useUnifiedConfiguration()) { + EsIndexRequest indexRequest = handler.parse(context, profileEvent); + // offer queue + if (indexRequest != null) { + context.offerDispatchQueue(indexRequest); + } else { + context.addSendFailMetric(); + profileEvent.ack(); + } + tx.commit(); } else { - context.addSendFailMetric(); - profileEvent.ack(); + List<EsIndexRequest> indexRequestList = handler.parse( + context, profileEvent, context.getTransformProcessor(profileEvent.getUid())); + if (CollectionUtils.isNotEmpty(indexRequestList)) { + indexRequestList.forEach(context::offerDispatchQueue); + } else { + context.addSendFailMetric(); + profileEvent.ack(); + } } - tx.commit(); - return; + } catch (Throwable t) { LOG.error("Process event failed!" + this.getName(), t); try { diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java index cc98ab57d6..3136a49276 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java @@ -27,7 +27,9 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; +import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @@ -38,6 +40,7 @@ import java.util.stream.Collectors; @NoArgsConstructor @AllArgsConstructor @SuperBuilder +@Slf4j public class EsIdConfig extends IdConfig { public static final String PATTERN_DAY = "{yyyyMMdd}"; @@ -71,6 +74,7 @@ public class EsIdConfig extends IdConfig { private int fieldOffset = 2; // for ftime,extinfo private int contentOffset = 0;// except for boss + tab(1) private List<String> fieldList; + private Charset charset; public static EsIdConfig create(DataFlowConfig dataFlowConfig) { EsSinkConfig sinkConfig = (EsSinkConfig) dataFlowConfig.getSinkConfig(); @@ -78,6 +82,15 @@ public class EsIdConfig extends IdConfig { .stream() .map(FieldConfig::getName) .collect(Collectors.toList()); + Charset charset; + try { + charset = Charset.forName(sinkConfig.getEncodingType()); + } catch (Throwable t) { + log.warn("do not support encoding type={}, dataflow id={}", + sinkConfig.getEncodingType(), dataFlowConfig.getDataflowId()); + charset = Charset.defaultCharset(); + } + return EsIdConfig.builder() .inlongGroupId(dataFlowConfig.getInlongGroupId()) .inlongStreamId(dataFlowConfig.getInlongStreamId()) @@ -86,6 +99,7 @@ public class EsIdConfig extends IdConfig { .separator(sinkConfig.getSeparator()) .indexNamePattern(sinkConfig.getIndexNamePattern()) .fieldList(fields) + .charset(charset) .build(); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java index f1f0cccf88..9c9664d775 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java @@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import org.apache.inlong.common.pojo.sort.ClusterTagConfig; import org.apache.inlong.common.pojo.sort.TaskConfig; +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; import org.apache.inlong.common.pojo.sort.node.EsNodeConfig; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.sdk.transform.encode.MapSinkEncoder; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.MapSinkInfo; +import org.apache.inlong.sdk.transform.process.TransformProcessor; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; @@ -37,6 +45,8 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import lombok.Getter; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -119,6 +129,9 @@ public class EsSinkContext extends SinkContext { private String strHttpHosts; private HttpHost[] httpHosts; + @Getter + protected Map<String, TransformProcessor<String, Map<String, Object>>> transformMap; + public EsSinkContext(String sinkName, Context context, Channel channel, BufferQueue<EsIndexRequest> dispatchQueue) { super(sinkName, context, channel); @@ -155,9 +168,12 @@ public class EsSinkContext extends SinkContext { // change current config Map<String, EsIdConfig> fromTaskConfig = reloadIdParamsFromTaskConfig(taskConfig); + Map<String, TransformProcessor<String, Map<String, Object>>> transformProcessor = + reloadTransform(taskConfig); Map<String, EsIdConfig> fromSortTaskConfig = reloadIdParamsFromSortTaskConfig(sortTaskConfig); if (unifiedConfiguration) { idConfigMap = fromTaskConfig; + transformMap = transformProcessor; reloadClientsFromNodeConfig(esNodeConfig); } else { idConfigMap = fromSortTaskConfig; @@ -207,6 +223,54 @@ public class EsSinkContext extends SinkContext { return newIdConfigMap; } + private Map<String, TransformProcessor<String, Map<String, Object>>> reloadTransform(TaskConfig taskConfig) { + ImmutableMap.Builder<String, TransformProcessor<String, Map<String, Object>>> builder = + new ImmutableMap.Builder<>(); + + taskConfig.getClusterTagConfigs() + .stream() + .map(ClusterTagConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .forEach(flow -> { + TransformProcessor<String, Map<String, Object>> transformProcessor = + createTransform(flow); + if (transformProcessor == null) { + return; + } + builder.put(InlongId.generateUid(flow.getInlongGroupId(), flow.getInlongStreamId()), + transformProcessor); + }); + + return builder.build(); + } + + private TransformProcessor<String, Map<String, Object>> createTransform(DataFlowConfig dataFlowConfig) { + try { + return TransformProcessor.create( + createTransformConfig(dataFlowConfig), + createSourceDecoder(dataFlowConfig.getSourceConfig()), + createEsSinkEncoder(dataFlowConfig.getSinkConfig())); + } catch (Exception e) { + LOG.error("failed to reload transform of dataflow={}, ex={}", dataFlowConfig.getDataflowId(), + e.getMessage()); + return null; + } + } + + private MapSinkEncoder createEsSinkEncoder(SinkConfig sinkConfig) { + if (!(sinkConfig instanceof EsSinkConfig)) { + throw new IllegalArgumentException("sinkInfo must be an instance of EsMapSinkInfo"); + } + EsSinkConfig esSinkConfig = (EsSinkConfig) sinkConfig; + List<FieldInfo> fieldInfos = esSinkConfig.getFieldConfigs() + .stream() + .map(config -> new FieldInfo(config.getName(), deriveTypeConverter(config.getFormatInfo()))) + .collect(Collectors.toList()); + + MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(), fieldInfos); + return SinkEncoderFactory.createMapEncoder(sinkInfo); + } + private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) { Map<String, String> properties = esNodeConfig.getProperties(); this.sinkContext = new Context(properties != null ? properties : new HashMap<>()); @@ -365,6 +429,10 @@ public class EsSinkContext extends SinkContext { return this.idConfigMap.get(uid); } + public TransformProcessor<String, Map<String, Object>> getTransformProcessor(String uid) { + return this.transformMap.get(uid); + } + /** * get nodeId * diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java index 03677557f6..db2e4bb8ee 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/IEvent2IndexRequestHandler.java @@ -17,8 +17,12 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; +import org.apache.inlong.sdk.transform.process.TransformProcessor; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import java.util.List; +import java.util.Map; + /** * * IEvent2IndexRequestHandler @@ -33,4 +37,9 @@ public interface IEvent2IndexRequestHandler { * @return */ EsIndexRequest parse(EsSinkContext context, ProfileEvent event); + + List<EsIndexRequest> parse( + EsSinkContext context, + ProfileEvent event, + TransformProcessor<String, Map<String, Object>> transformProcessor); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java index 4e00d3e690..83d91f92a4 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java @@ -21,6 +21,7 @@ import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.sort.standalone.channel.BufferQueueChannel; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter; import org.apache.inlong.sort.standalone.sink.SinkContext; import org.apache.inlong.sort.standalone.utils.BufferQueue; import org.apache.inlong.sort.standalone.utils.Constants; @@ -107,6 +108,7 @@ public class TestEsSinkContext { */ @Test public void test() throws Exception { + SortConfigMetricReporter.init(CommonPropertiesHolder.get()); BufferQueue<EsIndexRequest> dispatchQueue = SinkContext.createBufferQueue(); EsSinkContext context = mock(dispatchQueue); assertEquals(10, context.getBulkSizeMb());