This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1518021802 [INLONG-10684][SDK] Inlong transform supports context 
(#10702)
1518021802 is described below

commit 15180218024a5bf923761f83abbbc0e53268cd3f
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Jul 23 16:40:27 2024 +0800

    [INLONG-10684][SDK] Inlong transform supports context (#10702)
    
    * [INLONG-10684][SDK] Inlong transform supports context
    
    * fix UT
    
    * fix docs
---
 .../sdk/transform/decode/CsvSourceDecoder.java     | 11 +--
 .../sdk/transform/decode/JsonSourceDecoder.java    | 18 ++---
 .../sdk/transform/decode/KvSourceDecoder.java      | 12 ++--
 .../sdk/transform/decode/PbSourceDecoder.java      |  5 +-
 .../inlong/sdk/transform/decode/SourceDecoder.java |  6 +-
 .../sdk/transform/encode/CsvSinkEncoder.java       |  3 +-
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |  3 +-
 .../inlong/sdk/transform/encode/SinkEncoder.java   |  3 +-
 .../inlong/sdk/transform/pojo/TransformConfig.java | 18 +++++
 .../inlong/sdk/transform/process/Context.java      | 84 ++++++++++++++++++++++
 .../sdk/transform/process/TransformProcessor.java  | 25 +++++--
 .../transform/process/function/AbsFunction.java    |  5 +-
 .../transform/process/function/ConcatFunction.java |  5 +-
 .../transform/process/function/ExpFunction.java    |  5 +-
 .../sdk/transform/process/function/LnFunction.java |  5 +-
 .../transform/process/function/Log10Function.java  |  7 +-
 .../transform/process/function/Log2Function.java   |  7 +-
 .../transform/process/function/LogFunction.java    |  9 +--
 .../transform/process/function/NowFunction.java    |  3 +-
 .../transform/process/function/PowerFunction.java  |  7 +-
 .../transform/process/function/SqrtFunction.java   |  5 +-
 .../transform/process/operator/AndOperator.java    |  9 +--
 .../process/operator/EqualsToOperator.java         | 11 +--
 .../process/operator/ExpressionOperator.java       |  3 +-
 .../operator/GreaterThanEqualsOperator.java        | 11 +--
 .../process/operator/GreaterThanOperator.java      | 11 +--
 .../process/operator/MinorThanEqualsOperator.java  | 11 +--
 .../process/operator/MinorThanOperator.java        | 11 +--
 .../process/operator/NotEqualsToOperator.java      | 11 +--
 .../transform/process/operator/NotOperator.java    |  7 +-
 .../sdk/transform/process/operator/OrOperator.java |  9 +--
 .../process/operator/ParenthesisOperator.java      |  7 +-
 .../transform/process/parser/AdditionParser.java   | 11 +--
 .../sdk/transform/process/parser/ColumnParser.java |  5 +-
 .../transform/process/parser/DivisionParser.java   |  7 +-
 .../sdk/transform/process/parser/LongParser.java   |  5 +-
 .../process/parser/MultiplicationParser.java       | 11 +--
 .../process/parser/ParenthesisParser.java          |  7 +-
 .../sdk/transform/process/parser/StringParser.java |  5 +-
 .../process/parser/SubtractionParser.java          |  7 +-
 .../sdk/transform/process/parser/ValueParser.java  |  3 +-
 41 files changed, 279 insertions(+), 129 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index daddfd36d7..d809ac687a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -19,12 +19,13 @@ package org.apache.inlong.sdk.transform.decode;
 
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
 import java.util.List;
-import java.util.Map;
 
 /**
  * CsvSourceDecoder
@@ -53,19 +54,19 @@ public class CsvSourceDecoder implements 
SourceDecoder<String> {
     }
 
     @Override
-    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+    public SourceData decode(byte[] srcBytes, Context context) {
         String srcString = new String(srcBytes, srcCharset);
-        return this.decode(srcString, extParams);
+        return this.decode(srcString, context);
     }
 
     @Override
-    public SourceData decode(String srcString, Map<String, Object> extParams) {
+    public SourceData decode(String srcString, Context context) {
         String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, 
escapeChar, '\"', '\n', true);
         CsvSourceData sourceData = new CsvSourceData();
         for (int i = 0; i < rowValues.length; i++) {
             String[] fieldValues = rowValues[i];
             sourceData.addRow();
-            if (fields == null || fields.size() == 0) {
+            if (CollectionUtils.isEmpty(fields)) {
                 for (int j = 0; j < fieldValues.length; j++) {
                     String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 
1);
                     sourceData.putField(fieldName, fieldValues[j]);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 13c363912a..2d16d92bc1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -18,17 +18,18 @@
 package org.apache.inlong.sdk.transform.decode;
 
 import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * JsonSourceDecoder
@@ -65,26 +66,26 @@ public class JsonSourceDecoder implements 
SourceDecoder<String> {
     /**
      * decode
      * @param srcBytes
-     * @param extParams
+     * @param context
      * @return
      */
     @Override
-    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+    public SourceData decode(byte[] srcBytes, Context context) {
         String srcString = new String(srcBytes, srcCharset);
-        return this.decode(srcString, extParams);
+        return this.decode(srcString, context);
     }
 
     /**
      * decode
      * @param srcString
-     * @param extParams
+     * @param context
      * @return
      */
     @Override
-    public SourceData decode(String srcString, Map<String, Object> extParams) {
+    public SourceData decode(String srcString, Context context) {
         JsonObject root = gson.fromJson(srcString, JsonObject.class);
         JsonArray childRoot = null;
-        if (this.childNodes != null && this.childNodes.size() > 0) {
+        if (CollectionUtils.isNotEmpty(childNodes)) {
             JsonElement current = root;
             for (JsonNode node : childNodes) {
                 if (!current.isJsonObject()) {
@@ -117,7 +118,6 @@ public class JsonSourceDecoder implements 
SourceDecoder<String> {
             }
             childRoot = current.getAsJsonArray();
         }
-        SourceData sourceData = new JsonSourceData(root, childRoot);
-        return sourceData;
+        return new JsonSourceData(root, childRoot);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 77a4fef8b4..db26a4a57d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -19,7 +19,9 @@ package org.apache.inlong.sdk.transform.decode;
 
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.charset.Charset;
@@ -45,19 +47,19 @@ public class KvSourceDecoder implements 
SourceDecoder<String> {
     }
 
     @Override
-    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+    public SourceData decode(byte[] srcBytes, Context context) {
         String srcString = new String(srcBytes, srcCharset);
-        return this.decode(srcString, extParams);
+        return this.decode(srcString, context);
     }
 
     @Override
-    public SourceData decode(String srcString, Map<String, Object> extParams) {
+    public SourceData decode(String srcString, Context context) {
         List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', 
'=', '\\', '\"', '\n');
         KvSourceData sourceData = new KvSourceData();
-        if (fields == null || fields.size() == 0) {
+        if (CollectionUtils.isEmpty(fields)) {
             for (Map<String, String> row : rowValues) {
                 sourceData.addRow();
-                row.forEach((k, v) -> sourceData.putField(k, v));
+                row.forEach(sourceData::putField);
             }
             return sourceData;
         }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 6c8a919e24..48f3749c45 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.decode;
 
 import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
@@ -89,13 +90,13 @@ public class PbSourceDecoder implements 
SourceDecoder<byte[]> {
     /**
      * decode
      * @param srcBytes
-     * @param extParams
+     * @param context
      * @return
      * @throws InvalidProtocolBufferException 
      */
     @SuppressWarnings("unchecked")
     @Override
-    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+    public SourceData decode(byte[] srcBytes, Context context) {
         try {
             // decode
             DynamicMessage.Builder builder = 
DynamicMessage.newBuilder(rootDesc);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index 7bbb4dda2d..2e410d24c3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -17,15 +17,15 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
-import java.util.Map;
+import org.apache.inlong.sdk.transform.process.Context;
 
 /**
  * SourceDecoder
  */
 public interface SourceDecoder<Input> {
 
-    SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+    SourceData decode(byte[] srcBytes, Context context);
 
-    SourceData decode(Input input, Map<String, Object> extParams);
+    SourceData decode(Input input, Context context);
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 09cae6ea1b..1043f9c2e0 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
 
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -57,7 +58,7 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
      * @return
      */
     @Override
-    public String encode(SinkData sinkData) {
+    public String encode(SinkData sinkData, Context context) {
         builder.delete(0, builder.length());
         if (fields == null || fields.size() == 0) {
             if (escapeChar == null) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index be0a7ba980..edf46fcee1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.transform.encode;
 
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -49,7 +50,7 @@ public class KvSinkEncoder implements SinkEncoder<String> {
      * @return
      */
     @Override
-    public String encode(SinkData sinkData) {
+    public String encode(SinkData sinkData, Context context) {
         builder.delete(0, builder.length());
         if (fields == null || fields.size() == 0) {
             for (String fieldName : sinkData.keyList()) {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 150f1811f1..7f845a99d6 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.encode;
 
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import java.util.List;
 
@@ -26,7 +27,7 @@ import java.util.List;
  */
 public interface SinkEncoder<Output> {
 
-    Output encode(SinkData sinkData);
+    Output encode(SinkData sinkData, Context context);
 
     List<FieldInfo> getFields();
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index 71dd71be3b..b73f303233 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -19,6 +19,9 @@ package org.apache.inlong.sdk.transform.pojo;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
 
 /**
  * TransformConfig
@@ -28,9 +31,19 @@ public class TransformConfig {
     @JsonProperty("transformSql")
     private String transformSql;
 
+    @JsonProperty("configuration")
+    private Map<String, Object> configuration;
+
     @JsonCreator
     public TransformConfig(@JsonProperty("transformSql") String transformSql) {
+        this(transformSql, ImmutableMap.of());
+    }
+
+    @JsonCreator
+    public TransformConfig(@JsonProperty("transformSql") String transformSql,
+            @JsonProperty("configuration") Map<String, Object> configuration) {
         this.transformSql = transformSql;
+        this.configuration = configuration;
     }
 
     /**
@@ -42,6 +55,11 @@ public class TransformConfig {
         return transformSql;
     }
 
+    @JsonProperty("configuration")
+    public Map<String, Object> getConfiguration() {
+        return configuration;
+    }
+
     /**
      * set transformSql
      * @param transformSql the transformSql to set
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
new file mode 100644
index 0000000000..9e4f4b0c11
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/Context.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Transform context.
+ *
+ * <p>configuration</p> is the global configuration when init transform 
processor
+ * <p>extParams</p> is the ext params of each data
+ * <p>runtimeParams</p> is the runtime outputs when processing by each 
component
+ *
+ * The priority is runtimeParams > extParams > configuration.
+ */
+public class Context {
+
+    private final Map<String, Object> configuration;
+    private final Map<String, Object> extParams;
+    private final Map<String, Object> runtimeParams;
+
+    public Context(Map<String, Object> configuration, Map<String, Object> 
extParams) {
+        this.configuration = configuration;
+        this.extParams = extParams;
+        this.runtimeParams = new ConcurrentHashMap<>();
+    }
+
+    public Object put(String key, Object value) {
+        return runtimeParams.put(key, value);
+    }
+
+    public Object get(String key) {
+        Object obj = runtimeParams.get(key);
+        if (obj != null) {
+            return obj;
+        }
+        obj = extParams.get(key);
+        if (obj != null) {
+            return obj;
+        }
+        return configuration.get(key);
+    }
+
+    public String getString(String key) {
+        Object obj = this.get(key);
+        if (obj != null) {
+            return obj.toString();
+        }
+        return null;
+    }
+
+    public Integer getInteger(String key) {
+        Object obj = this.get(key);
+        if (obj != null) {
+            return Integer.getInteger(obj.toString());
+        }
+        return null;
+    }
+
+    public Long getLong(String key) {
+        Object obj = this.get(key);
+        if (obj != null) {
+            return Long.getLong(obj.toString());
+        }
+        return null;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 0e74180932..9944268dda 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -55,9 +55,9 @@ public class TransformProcessor<I, O> {
 
     private static final Map<String, Object> EMPTY_EXT_PARAMS = 
ImmutableMap.of();
 
-    private TransformConfig config;
-    private SourceDecoder<I> decoder;
-    private SinkEncoder<O> encoder;
+    private final TransformConfig config;
+    private final SourceDecoder<I> decoder;
+    private final SinkEncoder<O> encoder;
 
     private PlainSelect transformSelect;
     private ExpressionOperator where;
@@ -119,27 +119,38 @@ public class TransformProcessor<I, O> {
     }
 
     public List<O> transform(I input, Map<String, Object> extParams) {
-        SourceData sourceData = this.decoder.decode(input, extParams);
+        Context context = new Context(config.getConfiguration(), extParams);
+
+        // decode
+        SourceData sourceData = this.decoder.decode(input, context);
         if (sourceData == null) {
             return null;
         }
+
         List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
         for (int i = 0; i < sourceData.getRowCount(); i++) {
-            if (this.where != null && !this.where.check(sourceData, i)) {
+
+            // where check
+            if (this.where != null && !this.where.check(sourceData, i, 
context)) {
                 continue;
             }
+
+            // parse value
             SinkData sinkData = new DefaultSinkData();
             for (Entry<String, ValueParser> entry : 
this.selectItemMap.entrySet()) {
                 String fieldName = entry.getKey();
                 try {
-                    Object fieldValue = entry.getValue().parse(sourceData, i);
+                    ValueParser parser = entry.getValue();
+                    Object fieldValue = parser.parse(sourceData, i, context);
                     sinkData.addField(fieldName, String.valueOf(fieldValue));
                 } catch (Throwable t) {
                     LOG.error(t.getMessage(), t);
                     sinkData.addField(fieldName, "");
                 }
             }
-            sinkDatas.add(this.encoder.encode(sinkData));
+
+            // encode
+            sinkDatas.add(this.encoder.encode(sinkData, context));
         }
         return sinkDatas;
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
index a94d662eae..d2b2ceace6 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -48,8 +49,8 @@ public class AbsFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return numberValue.abs();
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
index 2bfe7a5588..529ddad007 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ConcatFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -59,10 +60,10 @@ public class ConcatFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
         StringBuilder builder = new StringBuilder();
         for (ValueParser node : nodeList) {
-            builder.append(node.parse(sourceData, rowIndex));
+            builder.append(node.parse(sourceData, rowIndex, context));
         }
         return builder.toString();
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
index 5f542413e2..5a7a9bbfac 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ExpFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -48,8 +49,8 @@ public class ExpFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return Math.exp(numberValue.doubleValue());
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
index d5e5ecf80b..530b2ff4f8 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -48,8 +49,8 @@ public class LnFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return Math.log(numberValue.doubleValue());
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
index d390893af2..e968a27aa1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log10Function.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -31,7 +32,7 @@ import java.math.BigDecimal;
  */
 public class Log10Function implements ValueParser {
 
-    private ValueParser numberParser;
+    private final ValueParser numberParser;
 
     /**
      * Constructor
@@ -48,8 +49,8 @@ public class Log10Function implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return Math.log10(numberValue.doubleValue());
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
index 9c502f25a5..914bc69ac3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/Log2Function.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -31,7 +32,7 @@ import java.math.BigDecimal;
  */
 public class Log2Function implements ValueParser {
 
-    private ValueParser numberParser;
+    private final ValueParser numberParser;
 
     /**
      * Constructor
@@ -48,8 +49,8 @@ public class Log2Function implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return Math.log(numberValue.doubleValue()) / Math.log(2);
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
index bc0e200255..ddbcd71a97 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LogFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -35,7 +36,7 @@ import java.util.List;
 public class LogFunction implements ValueParser {
 
     private ValueParser baseParser;
-    private ValueParser numberParser;
+    private final ValueParser numberParser;
 
     /**
      * Constructor
@@ -59,11 +60,11 @@ public class LogFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         if (baseParser != null) {
-            Object baseObj = baseParser.parse(sourceData, rowIndex);
+            Object baseObj = baseParser.parse(sourceData, rowIndex, context);
             BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj);
             return Math.log(numberValue.doubleValue()) / 
Math.log(baseValue.doubleValue());
         } else {
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
index 930a09af05..3857f22147 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/NowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.Function;
@@ -42,7 +43,7 @@ public class NowFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
         return String.valueOf(System.currentTimeMillis());
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
index 94835f9d7b..938fc00f6d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -50,9 +51,9 @@ public class PowerFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object baseObj = baseParser.parse(sourceData, rowIndex);
-        Object exponentObj = exponentParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object baseObj = baseParser.parse(sourceData, rowIndex, context);
+        Object exponentObj = exponentParser.parse(sourceData, rowIndex, 
context);
         BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj);
         BigDecimal exponentValue = OperatorTools.parseBigDecimal(exponentObj);
         return Math.pow(baseValue.doubleValue(), exponentValue.doubleValue());
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
index f9e277acbd..69cf43041a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.function;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
@@ -48,8 +49,8 @@ public class SqrtFunction implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object numberObj = numberParser.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object numberObj = numberParser.parse(sourceData, rowIndex, context);
         BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
         return Math.sqrt(numberValue.doubleValue());
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
index c6464f850d..a9dcd42606 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
 
@@ -27,8 +28,8 @@ import 
net.sf.jsqlparser.expression.operators.conditional.AndExpression;
  */
 public class AndOperator implements ExpressionOperator {
 
-    private ExpressionOperator left;
-    private ExpressionOperator right;
+    private final ExpressionOperator left;
+    private final ExpressionOperator right;
 
     public AndOperator(AndExpression expr) {
         this.left = OperatorTools.buildOperator(expr.getLeftExpression());
@@ -42,8 +43,8 @@ public class AndOperator implements ExpressionOperator {
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return left.check(sourceData, rowIndex) && right.check(sourceData, 
rowIndex);
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return left.check(sourceData, rowIndex, context) && 
right.check(sourceData, rowIndex, context);
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 6910e0c9ca..709537e8a0 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.EqualsTo;
  */
 public class EqualsToOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public EqualsToOperator(EqualsTo expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class EqualsToOperator implements ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) == 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
== 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
index b055e841e2..8f874b8de0 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
@@ -18,11 +18,12 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 /**
  * ExpressionOperator
  */
 public interface ExpressionOperator {
 
-    boolean check(SourceData sourceData, int rowIndex);
+    boolean check(SourceData sourceData, int rowIndex, Context context);
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index eb7689932e..3a53968e10 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
  */
 public class GreaterThanEqualsOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public GreaterThanEqualsOperator(GreaterThanEquals expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class GreaterThanEqualsOperator implements 
ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
>= 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index e0db44b1e3..a1cd8c2ea2 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.GreaterThan;
  */
 public class GreaterThanOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public GreaterThanOperator(GreaterThan expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class GreaterThanOperator implements 
ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) > 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
> 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index 8b3628ddb7..4248cf1d36 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
  */
 public class MinorThanEqualsOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public MinorThanEqualsOperator(MinorThanEquals expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class MinorThanEqualsOperator implements 
ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
<= 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 17baa9cb17..21ecc0400a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.MinorThan;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.MinorThan;
  */
 public class MinorThanOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public MinorThanOperator(MinorThan expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class MinorThanOperator implements ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) < 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
< 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index dbe185dec5..98bf102b4f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
@@ -28,8 +29,8 @@ import 
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
  */
 public class NotEqualsToOperator implements ExpressionOperator {
 
-    private ValueParser left;
-    private ValueParser right;
+    private final ValueParser left;
+    private final ValueParser right;
 
     public NotEqualsToOperator(NotEqualsTo expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -44,9 +45,9 @@ public class NotEqualsToOperator implements 
ExpressionOperator {
      */
     @SuppressWarnings("rawtypes")
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
-                (Comparable) this.right.parse(sourceData, rowIndex)) != 0;
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex, context),
+                (Comparable) this.right.parse(sourceData, rowIndex, context)) 
!= 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
index f648d426e7..d8b9ff07e0 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.NotExpression;
 
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.NotExpression;
  */
 public class NotOperator implements ExpressionOperator {
 
-    private ExpressionOperator node;
+    private final ExpressionOperator node;
 
     public NotOperator(NotExpression expr) {
         this.node = OperatorTools.buildOperator(expr.getExpression());
@@ -40,8 +41,8 @@ public class NotOperator implements ExpressionOperator {
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return !this.node.check(sourceData, rowIndex);
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return !this.node.check(sourceData, rowIndex, context);
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
index 33b9f82bdc..b5de7f279e 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
 
@@ -27,8 +28,8 @@ import 
net.sf.jsqlparser.expression.operators.conditional.OrExpression;
  */
 public class OrOperator implements ExpressionOperator {
 
-    private ExpressionOperator left;
-    private ExpressionOperator right;
+    private final ExpressionOperator left;
+    private final ExpressionOperator right;
 
     public OrOperator(OrExpression expr) {
         this.left = OperatorTools.buildOperator(expr.getLeftExpression());
@@ -42,8 +43,8 @@ public class OrOperator implements ExpressionOperator {
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return left.check(sourceData, rowIndex) || right.check(sourceData, 
rowIndex);
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return left.check(sourceData, rowIndex, context) || 
right.check(sourceData, rowIndex, context);
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
index 111f6bbb21..0ca1334fce 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.Parenthesis;
 
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
  */
 public class ParenthesisOperator implements ExpressionOperator {
 
-    private ExpressionOperator node;
+    private final ExpressionOperator node;
 
     public ParenthesisOperator(Parenthesis expr) {
         this.node = OperatorTools.buildOperator(expr.getExpression());
@@ -40,8 +41,8 @@ public class ParenthesisOperator implements 
ExpressionOperator {
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return this.node.check(sourceData, rowIndex);
+    public boolean check(SourceData sourceData, int rowIndex, Context context) 
{
+        return this.node.check(sourceData, rowIndex, context);
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index a0f03ab4cd..08474fe81f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
@@ -30,9 +31,9 @@ import java.math.BigDecimal;
  */
 public class AdditionParser implements ValueParser {
 
-    private ValueParser left;
+    private final ValueParser left;
 
-    private ValueParser right;
+    private final ValueParser right;
 
     public AdditionParser(Addition expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -46,9 +47,9 @@ public class AdditionParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object leftObj = this.left.parse(sourceData, rowIndex);
-        Object rightObj = this.right.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftObj = this.left.parse(sourceData, rowIndex, context);
+        Object rightObj = this.right.parse(sourceData, rowIndex, context);
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.add(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index afc58b422e..3a5000a57f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.Function;
 import net.sf.jsqlparser.schema.Column;
@@ -28,7 +29,7 @@ import net.sf.jsqlparser.schema.Column;
  */
 public class ColumnParser implements ValueParser {
 
-    private String fieldName;
+    private final String fieldName;
 
     public ColumnParser(Column expr) {
         this.fieldName = expr.toString();
@@ -45,7 +46,7 @@ public class ColumnParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
         return sourceData.getField(rowIndex, fieldName);
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
index 5dc94b6e99..61cf1bb82f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Division;
@@ -46,9 +47,9 @@ public class DivisionParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object leftObj = this.left.parse(sourceData, rowIndex);
-        Object rightObj = this.right.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftObj = this.left.parse(sourceData, rowIndex, context);
+        Object rightObj = this.right.parse(sourceData, rowIndex, context);
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.divide(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
index efd61cc2cb..7abb8af77c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.LongValue;
 
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.LongValue;
  */
 public class LongParser implements ValueParser {
 
-    private Long value;
+    private final Long value;
 
     public LongParser(LongValue expr) {
         this.value = expr.getValue();
@@ -40,7 +41,7 @@ public class LongParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
         return value;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
index 7918b434ac..f7299dcf8c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
@@ -30,9 +31,9 @@ import java.math.BigDecimal;
  */
 public class MultiplicationParser implements ValueParser {
 
-    private ValueParser left;
+    private final ValueParser left;
 
-    private ValueParser right;
+    private final ValueParser right;
 
     public MultiplicationParser(Multiplication expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
@@ -46,9 +47,9 @@ public class MultiplicationParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object leftObj = this.left.parse(sourceData, rowIndex);
-        Object rightObj = this.right.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftObj = this.left.parse(sourceData, rowIndex, context);
+        Object rightObj = this.right.parse(sourceData, rowIndex, context);
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.multiply(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
index 61a2bd1bf3..1899017087 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
 import net.sf.jsqlparser.expression.Parenthesis;
@@ -28,7 +29,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
  */
 public class ParenthesisParser implements ValueParser {
 
-    private ValueParser node;
+    private final ValueParser node;
 
     public ParenthesisParser(Parenthesis expr) {
         this.node = OperatorTools.buildParser(expr.getExpression());
@@ -41,7 +42,7 @@ public class ParenthesisParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        return node.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        return node.parse(sourceData, rowIndex, context);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
index 9cb431c1fa..9ce0646cc9 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 import net.sf.jsqlparser.expression.StringValue;
 
@@ -27,7 +28,7 @@ import net.sf.jsqlparser.expression.StringValue;
  */
 public class StringParser implements ValueParser {
 
-    private String stringValue;
+    private final String stringValue;
 
     public StringParser(StringValue expr) {
         this.stringValue = expr.getValue();
@@ -40,7 +41,7 @@ public class StringParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
         return stringValue;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index af36c79452..15d534d50f 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
 import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
@@ -46,9 +47,9 @@ public class SubtractionParser implements ValueParser {
      * @return
      */
     @Override
-    public Object parse(SourceData sourceData, int rowIndex) {
-        Object leftObj = this.left.parse(sourceData, rowIndex);
-        Object rightObj = this.right.parse(sourceData, rowIndex);
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object leftObj = this.left.parse(sourceData, rowIndex, context);
+        Object rightObj = this.right.parse(sourceData, rowIndex, context);
         BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
         BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
         return leftValue.subtract(rightValue);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
index bafafe276c..3b246cfc80 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
 
 /**
  * ValueParser
@@ -25,5 +26,5 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
  */
 public interface ValueParser {
 
-    Object parse(SourceData sourceData, int rowIndex);
+    Object parse(SourceData sourceData, int rowIndex, Context context);
 }

Reply via email to