This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1518021802 [INLONG-10684][SDK] Inlong transform supports context (#10702) 1518021802 is described below commit 15180218024a5bf923761f83abbbc0e53268cd3f Author: vernedeng <verned...@apache.org> AuthorDate: Tue Jul 23 16:40:27 2024 +0800 [INLONG-10684][SDK] Inlong transform supports context (#10702) * [INLONG-10684][SDK] Inlong transform supports context * fix UT * fix docs --- .../sdk/transform/decode/CsvSourceDecoder.java | 11 +-- .../sdk/transform/decode/JsonSourceDecoder.java | 18 ++--- .../sdk/transform/decode/KvSourceDecoder.java | 12 ++-- .../sdk/transform/decode/PbSourceDecoder.java | 5 +- .../inlong/sdk/transform/decode/SourceDecoder.java | 6 +- .../sdk/transform/encode/CsvSinkEncoder.java | 3 +- .../inlong/sdk/transform/encode/KvSinkEncoder.java | 3 +- .../inlong/sdk/transform/encode/SinkEncoder.java | 3 +- .../inlong/sdk/transform/pojo/TransformConfig.java | 18 +++++ .../inlong/sdk/transform/process/Context.java | 84 ++++++++++++++++++++++ .../sdk/transform/process/TransformProcessor.java | 25 +++++-- .../transform/process/function/AbsFunction.java | 5 +- .../transform/process/function/ConcatFunction.java | 5 +- .../transform/process/function/ExpFunction.java | 5 +- .../sdk/transform/process/function/LnFunction.java | 5 +- .../transform/process/function/Log10Function.java | 7 +- .../transform/process/function/Log2Function.java | 7 +- .../transform/process/function/LogFunction.java | 9 +-- .../transform/process/function/NowFunction.java | 3 +- .../transform/process/function/PowerFunction.java | 7 +- .../transform/process/function/SqrtFunction.java | 5 +- .../transform/process/operator/AndOperator.java | 9 +-- .../process/operator/EqualsToOperator.java | 11 +-- .../process/operator/ExpressionOperator.java | 3 +- .../operator/GreaterThanEqualsOperator.java | 11 +-- .../process/operator/GreaterThanOperator.java | 11 +-- .../process/operator/MinorThanEqualsOperator.java | 11 +-- .../process/operator/MinorThanOperator.java | 11 +-- .../process/operator/NotEqualsToOperator.java | 11 +-- .../transform/process/operator/NotOperator.java | 7 +- .../sdk/transform/process/operator/OrOperator.java | 9 +-- .../process/operator/ParenthesisOperator.java | 7 +- .../transform/process/parser/AdditionParser.java | 11 +-- .../sdk/transform/process/parser/ColumnParser.java | 5 +- .../transform/process/parser/DivisionParser.java | 7 +- .../sdk/transform/process/parser/LongParser.java | 5 +- .../process/parser/MultiplicationParser.java | 11 +-- .../process/parser/ParenthesisParser.java | 7 +- .../sdk/transform/process/parser/StringParser.java | 5 +- .../process/parser/SubtractionParser.java | 7 +- .../sdk/transform/process/parser/ValueParser.java | 3 +- 41 files changed, 279 insertions(+), 129 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index daddfd36d7..d809ac687a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -19,12 +19,13 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.nio.charset.Charset; import java.util.List; -import java.util.Map; /** * CsvSourceDecoder @@ -53,19 +54,19 @@ public class CsvSourceDecoder implements SourceDecoder<String> { } @Override - public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); - return this.decode(srcString, extParams); + return this.decode(srcString, context); } @Override - public SourceData decode(String srcString, Map<String, Object> extParams) { + public SourceData decode(String srcString, Context context) { String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, escapeChar, '\"', '\n', true); CsvSourceData sourceData = new CsvSourceData(); for (int i = 0; i < rowValues.length; i++) { String[] fieldValues = rowValues[i]; sourceData.addRow(); - if (fields == null || fields.size() == 0) { + if (CollectionUtils.isEmpty(fields)) { for (int j = 0; j < fieldValues.length; j++) { String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 1); sourceData.putField(fieldName, fieldValues[j]); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java index 13c363912a..2d16d92bc1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java @@ -18,17 +18,18 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * JsonSourceDecoder @@ -65,26 +66,26 @@ public class JsonSourceDecoder implements SourceDecoder<String> { /** * decode * @param srcBytes - * @param extParams + * @param context * @return */ @Override - public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); - return this.decode(srcString, extParams); + return this.decode(srcString, context); } /** * decode * @param srcString - * @param extParams + * @param context * @return */ @Override - public SourceData decode(String srcString, Map<String, Object> extParams) { + public SourceData decode(String srcString, Context context) { JsonObject root = gson.fromJson(srcString, JsonObject.class); JsonArray childRoot = null; - if (this.childNodes != null && this.childNodes.size() > 0) { + if (CollectionUtils.isNotEmpty(childNodes)) { JsonElement current = root; for (JsonNode node : childNodes) { if (!current.isJsonObject()) { @@ -117,7 +118,6 @@ public class JsonSourceDecoder implements SourceDecoder<String> { } childRoot = current.getAsJsonArray(); } - SourceData sourceData = new JsonSourceData(root, childRoot); - return sourceData; + return new JsonSourceData(root, childRoot); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java index 77a4fef8b4..db26a4a57d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java @@ -19,7 +19,9 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.nio.charset.Charset; @@ -45,19 +47,19 @@ public class KvSourceDecoder implements SourceDecoder<String> { } @Override - public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + public SourceData decode(byte[] srcBytes, Context context) { String srcString = new String(srcBytes, srcCharset); - return this.decode(srcString, extParams); + return this.decode(srcString, context); } @Override - public SourceData decode(String srcString, Map<String, Object> extParams) { + public SourceData decode(String srcString, Context context) { List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', '=', '\\', '\"', '\n'); KvSourceData sourceData = new KvSourceData(); - if (fields == null || fields.size() == 0) { + if (CollectionUtils.isEmpty(fields)) { for (Map<String, String> row : rowValues) { sourceData.addRow(); - row.forEach((k, v) -> sourceData.putField(k, v)); + row.forEach(sourceData::putField); } return sourceData; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java index 6c8a919e24..48f3749c45 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; +import org.apache.inlong.sdk.transform.process.Context; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; @@ -89,13 +90,13 @@ public class PbSourceDecoder implements SourceDecoder<byte[]> { /** * decode * @param srcBytes - * @param extParams + * @param context * @return * @throws InvalidProtocolBufferException */ @SuppressWarnings("unchecked") @Override - public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + public SourceData decode(byte[] srcBytes, Context context) { try { // decode DynamicMessage.Builder builder = DynamicMessage.newBuilder(rootDesc); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java index 7bbb4dda2d..2e410d24c3 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java @@ -17,15 +17,15 @@ package org.apache.inlong.sdk.transform.decode; -import java.util.Map; +import org.apache.inlong.sdk.transform.process.Context; /** * SourceDecoder */ public interface SourceDecoder<Input> { - SourceData decode(byte[] srcBytes, Map<String, Object> extParams); + SourceData decode(byte[] srcBytes, Context context); - SourceData decode(Input input, Map<String, Object> extParams); + SourceData decode(Input input, Context context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index 09cae6ea1b..1043f9c2e0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode; import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.commons.lang3.StringUtils; @@ -57,7 +58,7 @@ public class CsvSinkEncoder implements SinkEncoder<String> { * @return */ @Override - public String encode(SinkData sinkData) { + public String encode(SinkData sinkData, Context context) { builder.delete(0, builder.length()); if (fields == null || fields.size() == 0) { if (escapeChar == null) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index be0a7ba980..edf46fcee1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode; import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.commons.lang3.StringUtils; @@ -49,7 +50,7 @@ public class KvSinkEncoder implements SinkEncoder<String> { * @return */ @Override - public String encode(SinkData sinkData) { + public String encode(SinkData sinkData, Context context) { builder.delete(0, builder.length()); if (fields == null || fields.size() == 0) { for (String fieldName : sinkData.keyList()) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index 150f1811f1..7f845a99d6 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.encode; import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.process.Context; import java.util.List; @@ -26,7 +27,7 @@ import java.util.List; */ public interface SinkEncoder<Output> { - Output encode(SinkData sinkData); + Output encode(SinkData sinkData, Context context); List<FieldInfo> getFields(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java index 71dd71be3b..b73f303233 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java @@ -19,6 +19,9 @@ package org.apache.inlong.sdk.transform.pojo; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; /** * TransformConfig @@ -28,9 +31,19 @@ public class TransformConfig { @JsonProperty("transformSql") private String transformSql; + @JsonProperty("configuration") + private Map<String, Object> configuration; + @JsonCreator public TransformConfig(@JsonProperty("transformSql") String transformSql) { + this(transformSql, ImmutableMap.of()); + } + + @JsonCreator + public TransformConfig(@JsonProperty("transformSql") String transformSql, + @JsonProperty("configuration") Map<String, Object> configuration) { this.transformSql = transformSql; + this.configuration = configuration; } /** @@ -42,6 +55,11 @@ public class TransformConfig { return transformSql; } + @JsonProperty("configuration") + public Map<String, Object> getConfiguration() { + return configuration; + } + /** * set transformSql * @param transformSql the transformSql to set diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java new file mode 100644 index 0000000000..9e4f4b0c11 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Transform context. + * + * <p>configuration</p> is the global configuration when init transform processor + * <p>extParams</p> is the ext params of each data + * <p>runtimeParams</p> is the runtime outputs when processing by each component + * + * The priority is runtimeParams > extParams > configuration. + */ +public class Context { + + private final Map<String, Object> configuration; + private final Map<String, Object> extParams; + private final Map<String, Object> runtimeParams; + + public Context(Map<String, Object> configuration, Map<String, Object> extParams) { + this.configuration = configuration; + this.extParams = extParams; + this.runtimeParams = new ConcurrentHashMap<>(); + } + + public Object put(String key, Object value) { + return runtimeParams.put(key, value); + } + + public Object get(String key) { + Object obj = runtimeParams.get(key); + if (obj != null) { + return obj; + } + obj = extParams.get(key); + if (obj != null) { + return obj; + } + return configuration.get(key); + } + + public String getString(String key) { + Object obj = this.get(key); + if (obj != null) { + return obj.toString(); + } + return null; + } + + public Integer getInteger(String key) { + Object obj = this.get(key); + if (obj != null) { + return Integer.getInteger(obj.toString()); + } + return null; + } + + public Long getLong(String key) { + Object obj = this.get(key); + if (obj != null) { + return Long.getLong(obj.toString()); + } + return null; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 0e74180932..9944268dda 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -55,9 +55,9 @@ public class TransformProcessor<I, O> { private static final Map<String, Object> EMPTY_EXT_PARAMS = ImmutableMap.of(); - private TransformConfig config; - private SourceDecoder<I> decoder; - private SinkEncoder<O> encoder; + private final TransformConfig config; + private final SourceDecoder<I> decoder; + private final SinkEncoder<O> encoder; private PlainSelect transformSelect; private ExpressionOperator where; @@ -119,27 +119,38 @@ public class TransformProcessor<I, O> { } public List<O> transform(I input, Map<String, Object> extParams) { - SourceData sourceData = this.decoder.decode(input, extParams); + Context context = new Context(config.getConfiguration(), extParams); + + // decode + SourceData sourceData = this.decoder.decode(input, context); if (sourceData == null) { return null; } + List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount()); for (int i = 0; i < sourceData.getRowCount(); i++) { - if (this.where != null && !this.where.check(sourceData, i)) { + + // where check + if (this.where != null && !this.where.check(sourceData, i, context)) { continue; } + + // parse value SinkData sinkData = new DefaultSinkData(); for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) { String fieldName = entry.getKey(); try { - Object fieldValue = entry.getValue().parse(sourceData, i); + ValueParser parser = entry.getValue(); + Object fieldValue = parser.parse(sourceData, i, context); sinkData.addField(fieldName, String.valueOf(fieldValue)); } catch (Throwable t) { LOG.error(t.getMessage(), t); sinkData.addField(fieldName, ""); } } - sinkDatas.add(this.encoder.encode(sinkData)); + + // encode + sinkDatas.add(this.encoder.encode(sinkData, context)); } return sinkDatas; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java index a94d662eae..d2b2ceace6 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -48,8 +49,8 @@ public class AbsFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return numberValue.abs(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java index 2bfe7a5588..529ddad007 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -59,10 +60,10 @@ public class ConcatFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { + public Object parse(SourceData sourceData, int rowIndex, Context context) { StringBuilder builder = new StringBuilder(); for (ValueParser node : nodeList) { - builder.append(node.parse(sourceData, rowIndex)); + builder.append(node.parse(sourceData, rowIndex, context)); } return builder.toString(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java index 5f542413e2..5a7a9bbfac 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -48,8 +49,8 @@ public class ExpFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return Math.exp(numberValue.doubleValue()); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java index d5e5ecf80b..530b2ff4f8 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -48,8 +49,8 @@ public class LnFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return Math.log(numberValue.doubleValue()); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java index d390893af2..e968a27aa1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -31,7 +32,7 @@ import java.math.BigDecimal; */ public class Log10Function implements ValueParser { - private ValueParser numberParser; + private final ValueParser numberParser; /** * Constructor @@ -48,8 +49,8 @@ public class Log10Function implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return Math.log10(numberValue.doubleValue()); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java index 9c502f25a5..914bc69ac3 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -31,7 +32,7 @@ import java.math.BigDecimal; */ public class Log2Function implements ValueParser { - private ValueParser numberParser; + private final ValueParser numberParser; /** * Constructor @@ -48,8 +49,8 @@ public class Log2Function implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return Math.log(numberValue.doubleValue()) / Math.log(2); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java index bc0e200255..ddbcd71a97 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -35,7 +36,7 @@ import java.util.List; public class LogFunction implements ValueParser { private ValueParser baseParser; - private ValueParser numberParser; + private final ValueParser numberParser; /** * Constructor @@ -59,11 +60,11 @@ public class LogFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); if (baseParser != null) { - Object baseObj = baseParser.parse(sourceData, rowIndex); + Object baseObj = baseParser.parse(sourceData, rowIndex, context); BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj); return Math.log(numberValue.doubleValue()) / Math.log(baseValue.doubleValue()); } else { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java index 930a09af05..3857f22147 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.Function; @@ -42,7 +43,7 @@ public class NowFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { + public Object parse(SourceData sourceData, int rowIndex, Context context) { return String.valueOf(System.currentTimeMillis()); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java index 94835f9d7b..938fc00f6d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -50,9 +51,9 @@ public class PowerFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object baseObj = baseParser.parse(sourceData, rowIndex); - Object exponentObj = exponentParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object baseObj = baseParser.parse(sourceData, rowIndex, context); + Object exponentObj = exponentParser.parse(sourceData, rowIndex, context); BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj); BigDecimal exponentValue = OperatorTools.parseBigDecimal(exponentObj); return Math.pow(baseValue.doubleValue(), exponentValue.doubleValue()); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java index f9e277acbd..69cf43041a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.function; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; @@ -48,8 +49,8 @@ public class SqrtFunction implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object numberObj = numberParser.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); return Math.sqrt(numberValue.doubleValue()); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java index c6464f850d..a9dcd42606 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; @@ -27,8 +28,8 @@ import net.sf.jsqlparser.expression.operators.conditional.AndExpression; */ public class AndOperator implements ExpressionOperator { - private ExpressionOperator left; - private ExpressionOperator right; + private final ExpressionOperator left; + private final ExpressionOperator right; public AndOperator(AndExpression expr) { this.left = OperatorTools.buildOperator(expr.getLeftExpression()); @@ -42,8 +43,8 @@ public class AndOperator implements ExpressionOperator { * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return left.check(sourceData, rowIndex) && right.check(sourceData, rowIndex); + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return left.check(sourceData, rowIndex, context) && right.check(sourceData, rowIndex, context); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java index 6910e0c9ca..709537e8a0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.EqualsTo; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.EqualsTo; */ public class EqualsToOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public EqualsToOperator(EqualsTo expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class EqualsToOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) == 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) == 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java index b055e841e2..8f874b8de0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java @@ -18,11 +18,12 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; /** * ExpressionOperator */ public interface ExpressionOperator { - boolean check(SourceData sourceData, int rowIndex); + boolean check(SourceData sourceData, int rowIndex, Context context); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java index eb7689932e..3a53968e10 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; */ public class GreaterThanEqualsOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public GreaterThanEqualsOperator(GreaterThanEquals expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class GreaterThanEqualsOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) >= 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) >= 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java index e0db44b1e3..a1cd8c2ea2 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThan; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.GreaterThan; */ public class GreaterThanOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public GreaterThanOperator(GreaterThan expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class GreaterThanOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) > 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) > 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java index 8b3628ddb7..4248cf1d36 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; */ public class MinorThanEqualsOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public MinorThanEqualsOperator(MinorThanEquals expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class MinorThanEqualsOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) <= 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) <= 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java index 17baa9cb17..21ecc0400a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThan; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.MinorThan; */ public class MinorThanOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public MinorThanOperator(MinorThan expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class MinorThanOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) < 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) < 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java index dbe185dec5..98bf102b4f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; @@ -28,8 +29,8 @@ import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; */ public class NotEqualsToOperator implements ExpressionOperator { - private ValueParser left; - private ValueParser right; + private final ValueParser left; + private final ValueParser right; public NotEqualsToOperator(NotEqualsTo expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -44,9 +45,9 @@ public class NotEqualsToOperator implements ExpressionOperator { */ @SuppressWarnings("rawtypes") @Override - public boolean check(SourceData sourceData, int rowIndex) { - return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), - (Comparable) this.right.parse(sourceData, rowIndex)) != 0; + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context), + (Comparable) this.right.parse(sourceData, rowIndex, context)) != 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java index f648d426e7..d8b9ff07e0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.NotExpression; @@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.NotExpression; */ public class NotOperator implements ExpressionOperator { - private ExpressionOperator node; + private final ExpressionOperator node; public NotOperator(NotExpression expr) { this.node = OperatorTools.buildOperator(expr.getExpression()); @@ -40,8 +41,8 @@ public class NotOperator implements ExpressionOperator { * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return !this.node.check(sourceData, rowIndex); + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return !this.node.check(sourceData, rowIndex, context); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java index 33b9f82bdc..b5de7f279e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.operators.conditional.OrExpression; @@ -27,8 +28,8 @@ import net.sf.jsqlparser.expression.operators.conditional.OrExpression; */ public class OrOperator implements ExpressionOperator { - private ExpressionOperator left; - private ExpressionOperator right; + private final ExpressionOperator left; + private final ExpressionOperator right; public OrOperator(OrExpression expr) { this.left = OperatorTools.buildOperator(expr.getLeftExpression()); @@ -42,8 +43,8 @@ public class OrOperator implements ExpressionOperator { * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return left.check(sourceData, rowIndex) || right.check(sourceData, rowIndex); + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return left.check(sourceData, rowIndex, context) || right.check(sourceData, rowIndex, context); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java index 111f6bbb21..0ca1334fce 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.Parenthesis; @@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.Parenthesis; */ public class ParenthesisOperator implements ExpressionOperator { - private ExpressionOperator node; + private final ExpressionOperator node; public ParenthesisOperator(Parenthesis expr) { this.node = OperatorTools.buildOperator(expr.getExpression()); @@ -40,8 +41,8 @@ public class ParenthesisOperator implements ExpressionOperator { * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return this.node.check(sourceData, rowIndex); + public boolean check(SourceData sourceData, int rowIndex, Context context) { + return this.node.check(sourceData, rowIndex, context); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java index a0f03ab4cd..08474fe81f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import net.sf.jsqlparser.expression.operators.arithmetic.Addition; @@ -30,9 +31,9 @@ import java.math.BigDecimal; */ public class AdditionParser implements ValueParser { - private ValueParser left; + private final ValueParser left; - private ValueParser right; + private final ValueParser right; public AdditionParser(Addition expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -46,9 +47,9 @@ public class AdditionParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object leftObj = this.left.parse(sourceData, rowIndex); - Object rightObj = this.right.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.add(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java index afc58b422e..3a5000a57f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.Function; import net.sf.jsqlparser.schema.Column; @@ -28,7 +29,7 @@ import net.sf.jsqlparser.schema.Column; */ public class ColumnParser implements ValueParser { - private String fieldName; + private final String fieldName; public ColumnParser(Column expr) { this.fieldName = expr.toString(); @@ -45,7 +46,7 @@ public class ColumnParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { + public Object parse(SourceData sourceData, int rowIndex, Context context) { return sourceData.getField(rowIndex, fieldName); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java index 5dc94b6e99..61cf1bb82f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import net.sf.jsqlparser.expression.operators.arithmetic.Division; @@ -46,9 +47,9 @@ public class DivisionParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object leftObj = this.left.parse(sourceData, rowIndex); - Object rightObj = this.right.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.divide(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java index efd61cc2cb..7abb8af77c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.LongValue; @@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.LongValue; */ public class LongParser implements ValueParser { - private Long value; + private final Long value; public LongParser(LongValue expr) { this.value = expr.getValue(); @@ -40,7 +41,7 @@ public class LongParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { + public Object parse(SourceData sourceData, int rowIndex, Context context) { return value; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java index 7918b434ac..f7299dcf8c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; @@ -30,9 +31,9 @@ import java.math.BigDecimal; */ public class MultiplicationParser implements ValueParser { - private ValueParser left; + private final ValueParser left; - private ValueParser right; + private final ValueParser right; public MultiplicationParser(Multiplication expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); @@ -46,9 +47,9 @@ public class MultiplicationParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object leftObj = this.left.parse(sourceData, rowIndex); - Object rightObj = this.right.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.multiply(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java index 61a2bd1bf3..1899017087 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import net.sf.jsqlparser.expression.Parenthesis; @@ -28,7 +29,7 @@ import net.sf.jsqlparser.expression.Parenthesis; */ public class ParenthesisParser implements ValueParser { - private ValueParser node; + private final ValueParser node; public ParenthesisParser(Parenthesis expr) { this.node = OperatorTools.buildParser(expr.getExpression()); @@ -41,7 +42,7 @@ public class ParenthesisParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - return node.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + return node.parse(sourceData, rowIndex, context); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java index 9cb431c1fa..9ce0646cc9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import net.sf.jsqlparser.expression.StringValue; @@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.StringValue; */ public class StringParser implements ValueParser { - private String stringValue; + private final String stringValue; public StringParser(StringValue expr) { this.stringValue = expr.getValue(); @@ -40,7 +41,7 @@ public class StringParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { + public Object parse(SourceData sourceData, int rowIndex, Context context) { return stringValue; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java index af36c79452..15d534d50f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; @@ -46,9 +47,9 @@ public class SubtractionParser implements ValueParser { * @return */ @Override - public Object parse(SourceData sourceData, int rowIndex) { - Object leftObj = this.left.parse(sourceData, rowIndex); - Object rightObj = this.right.parse(sourceData, rowIndex); + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); return leftValue.subtract(rightValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java index bafafe276c..3b246cfc80 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; /** * ValueParser @@ -25,5 +26,5 @@ import org.apache.inlong.sdk.transform.decode.SourceData; */ public interface ValueParser { - Object parse(SourceData sourceData, int rowIndex); + Object parse(SourceData sourceData, int rowIndex, Context context); }