This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae49243a5a [INLONG-10129][SDK] Transform SQL support +-*/ operations 
(#10133)
ae49243a5a is described below

commit ae49243a5a4cbf512acfc0c391ac9995e68b2879
Author: 卢春亮 <luchunli...@apache.org>
AuthorDate: Tue May 7 19:10:17 2024 +0800

    [INLONG-10129][SDK] Transform SQL support +-*/ operations (#10133)
---
 .../sdk/transform/process/TransformProcessor.java  | 13 +++-
 .../process/operator/EqualsToOperator.java         |  5 +-
 .../operator/GreaterThanEqualsOperator.java        |  3 +-
 .../process/operator/GreaterThanOperator.java      |  3 +-
 .../process/operator/MinorThanEqualsOperator.java  |  3 +-
 .../process/operator/MinorThanOperator.java        |  3 +-
 .../process/operator/NotEqualsToOperator.java      |  5 +-
 .../transform/process/operator/OperatorTools.java  | 70 +++++++++++++++++++++-
 .../AdditionParser.java}                           | 27 +++++----
 .../DivisionParser.java}                           | 27 +++++----
 .../MultiplicationParser.java}                     | 27 +++++----
 .../ParenthesisParser.java}                        | 26 ++++----
 .../SubtractionParser.java}                        | 27 +++++----
 .../transform/process/TestTransformProcessor.java  | 26 ++++++++
 14 files changed, 191 insertions(+), 74 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 23ca6644fa..c979ef71c4 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -52,6 +52,8 @@ import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.StringReader;
 import java.nio.charset.Charset;
@@ -67,6 +69,8 @@ import java.util.Map.Entry;
  */
 public class TransformProcessor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransformProcessor.class);
+
     private TransformConfig config;
     private SourceDecoder decoder;
     private SinkEncoder encoder;
@@ -166,8 +170,13 @@ public class TransformProcessor {
             SinkData sinkData = new DefaultSinkData();
             for (Entry<String, ValueParser> entry : 
this.selectItemMap.entrySet()) {
                 String fieldName = entry.getKey();
-                Object fieldValue = entry.getValue().parse(sourceData, i);
-                sinkData.putField(fieldName, String.valueOf(fieldValue));
+                try {
+                    Object fieldValue = entry.getValue().parse(sourceData, i);
+                    sinkData.putField(fieldName, String.valueOf(fieldValue));
+                } catch (Throwable t) {
+                    LOG.error(t.getMessage(), t);
+                    sinkData.putField(fieldName, "");
+                }
             }
             sinkDatas.add(this.encoder.encode(sinkData));
         }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
index 3172626000..6910e0c9ca 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * EqualsToOperator
@@ -43,9 +42,11 @@ public class EqualsToOperator implements ExpressionOperator {
      * @param rowIndex
      * @return
      */
+    @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) == 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
index 07da9d79c2..eb7689932e 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * GreaterThanEqualsOperator
@@ -46,7 +45,7 @@ public class GreaterThanEqualsOperator implements 
ExpressionOperator {
     @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
                 (Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
index 3b2158d96b..e0db44b1e3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * GreaterThanOperator
@@ -46,7 +45,7 @@ public class GreaterThanOperator implements 
ExpressionOperator {
     @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
                 (Comparable) this.right.parse(sourceData, rowIndex)) > 0;
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
index fec4ed8019..8b3628ddb7 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * MinorThanEqualsOperator
@@ -46,7 +45,7 @@ public class MinorThanEqualsOperator implements 
ExpressionOperator {
     @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
                 (Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
index 5d9db7dd9c..17baa9cb17 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.MinorThan;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * MinorThanOperator
@@ -46,7 +45,7 @@ public class MinorThanOperator implements ExpressionOperator {
     @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
                 (Comparable) this.right.parse(sourceData, rowIndex)) < 0;
     }
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
index 9c58e70476..dbe185dec5 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
 
 /**
  * NotEqualsToOperator
@@ -43,9 +42,11 @@ public class NotEqualsToOperator implements 
ExpressionOperator {
      * @param rowIndex
      * @return
      */
+    @SuppressWarnings("rawtypes")
     @Override
     public boolean check(SourceData sourceData, int rowIndex) {
-        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+        return OperatorTools.compareValue((Comparable) 
this.left.parse(sourceData, rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) != 0;
     }
 
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 8afe2f0c74..361ce1aec3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -17,9 +17,14 @@
 
 package org.apache.inlong.sdk.transform.process.operator;
 
+import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
 import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
 import org.apache.inlong.sdk.transform.process.parser.LongParser;
+import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser;
+import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser;
 import org.apache.inlong.sdk.transform.process.parser.StringParser;
+import org.apache.inlong.sdk.transform.process.parser.SubtractionParser;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
 import net.sf.jsqlparser.expression.Expression;
@@ -28,6 +33,10 @@ import net.sf.jsqlparser.expression.LongValue;
 import net.sf.jsqlparser.expression.NotExpression;
 import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+import net.sf.jsqlparser.expression.operators.arithmetic.Division;
+import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
+import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
 import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
 import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
@@ -37,6 +46,9 @@ import 
net.sf.jsqlparser.expression.operators.relational.MinorThan;
 import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
 import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
 import net.sf.jsqlparser.schema.Column;
+import org.apache.commons.lang.ObjectUtils;
+
+import java.math.BigDecimal;
 
 /**
  * OperatorTools
@@ -44,6 +56,10 @@ import net.sf.jsqlparser.schema.Column;
  */
 public class OperatorTools {
 
+    public static final String ROOT_KEY = "$root";
+
+    public static final String CHILD_KEY = "$child";
+
     public static ExpressionOperator buildOperator(Expression expr) {
         if (expr instanceof AndExpression) {
             return new AndOperator((AndExpression) expr);
@@ -76,9 +92,61 @@ public class OperatorTools {
             return new StringParser((StringValue) expr);
         } else if (expr instanceof LongValue) {
             return new LongParser((LongValue) expr);
+        } else if (expr instanceof Parenthesis) {
+            return new ParenthesisParser((Parenthesis) expr);
+        } else if (expr instanceof Addition) {
+            return new AdditionParser((Addition) expr);
+        } else if (expr instanceof Subtraction) {
+            return new SubtractionParser((Subtraction) expr);
+        } else if (expr instanceof Multiplication) {
+            return new MultiplicationParser((Multiplication) expr);
+        } else if (expr instanceof Division) {
+            return new DivisionParser((Division) expr);
         } else if (expr instanceof Function) {
-            return new ColumnParser((Function) expr);
+            String exprString = expr.toString();
+            if (exprString.startsWith(ROOT_KEY) || 
exprString.startsWith(CHILD_KEY)) {
+                return new ColumnParser((Function) expr);
+            } else {
+                // TODO
+            }
         }
         return null;
     }
+
+    /**
+     * parseBigDecimal
+     * @param value
+     * @return
+     */
+    public static BigDecimal parseBigDecimal(Object value) {
+        if (value instanceof BigDecimal) {
+            return (BigDecimal) value;
+        } else {
+            return new BigDecimal(String.valueOf(value));
+        }
+    }
+
+    /**
+     * compareValue
+     * @param value
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    public static int compareValue(Comparable left, Comparable right) {
+        if (left instanceof String) {
+            if (right instanceof String) {
+                return ObjectUtils.compare(left, right);
+            } else {
+                BigDecimal leftValue = parseBigDecimal(left);
+                return ObjectUtils.compare(leftValue, right);
+            }
+        } else {
+            if (right instanceof String) {
+                BigDecimal rightValue = parseBigDecimal(right);
+                return ObjectUtils.compare(left, rightValue);
+            } else {
+                return ObjectUtils.compare(left, right);
+            }
+        }
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
similarity index 62%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
index 9c58e70476..a0f03ab4cd 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java
@@ -15,37 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
+
+import java.math.BigDecimal;
 
 /**
- * NotEqualsToOperator
+ * AdditionParser
  * 
  */
-public class NotEqualsToOperator implements ExpressionOperator {
+public class AdditionParser implements ValueParser {
 
     private ValueParser left;
+
     private ValueParser right;
 
-    public NotEqualsToOperator(NotEqualsTo expr) {
+    public AdditionParser(Addition expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
     /**
-     * check
+     * parse
      * @param sourceData
      * @param rowIndex
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    public Object parse(SourceData sourceData, int rowIndex) {
+        Object leftObj = this.left.parse(sourceData, rowIndex);
+        Object rightObj = this.right.parse(sourceData, rowIndex);
+        BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+        BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+        return leftValue.add(rightValue);
     }
-
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
similarity index 61%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
index 9c58e70476..5dc94b6e99 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java
@@ -15,37 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Division;
+
+import java.math.BigDecimal;
 
 /**
- * NotEqualsToOperator
+ * DivisionParser
  * 
  */
-public class NotEqualsToOperator implements ExpressionOperator {
+public class DivisionParser implements ValueParser {
 
     private ValueParser left;
+
     private ValueParser right;
 
-    public NotEqualsToOperator(NotEqualsTo expr) {
+    public DivisionParser(Division expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
     /**
-     * check
+     * parse
      * @param sourceData
      * @param rowIndex
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    public Object parse(SourceData sourceData, int rowIndex) {
+        Object leftObj = this.left.parse(sourceData, rowIndex);
+        Object rightObj = this.right.parse(sourceData, rowIndex);
+        BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+        BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+        return leftValue.divide(rightValue);
     }
-
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
similarity index 60%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
index 9c58e70476..7918b434ac 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java
@@ -15,37 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication;
+
+import java.math.BigDecimal;
 
 /**
- * NotEqualsToOperator
+ * MultiplicationParser
  * 
  */
-public class NotEqualsToOperator implements ExpressionOperator {
+public class MultiplicationParser implements ValueParser {
 
     private ValueParser left;
+
     private ValueParser right;
 
-    public NotEqualsToOperator(NotEqualsTo expr) {
+    public MultiplicationParser(Multiplication expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
     /**
-     * check
+     * parse
      * @param sourceData
      * @param rowIndex
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    public Object parse(SourceData sourceData, int rowIndex) {
+        Object leftObj = this.left.parse(sourceData, rowIndex);
+        Object rightObj = this.right.parse(sourceData, rowIndex);
+        BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+        BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+        return leftValue.multiply(rightValue);
     }
-
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
similarity index 56%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
index 3172626000..61a2bd1bf3 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java
@@ -15,37 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
-import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.Parenthesis;
 
 /**
- * EqualsToOperator
+ * ParenthesisParser
  * 
  */
-public class EqualsToOperator implements ExpressionOperator {
+public class ParenthesisParser implements ValueParser {
 
-    private ValueParser left;
-    private ValueParser right;
+    private ValueParser node;
 
-    public EqualsToOperator(EqualsTo expr) {
-        this.left = OperatorTools.buildParser(expr.getLeftExpression());
-        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    public ParenthesisParser(Parenthesis expr) {
+        this.node = OperatorTools.buildParser(expr.getExpression());
     }
 
     /**
-     * check
+     * parse
      * @param sourceData
      * @param rowIndex
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    public Object parse(SourceData sourceData, int rowIndex) {
+        return node.parse(sourceData, rowIndex);
     }
-
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
similarity index 61%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
index 9c58e70476..af36c79452 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java
@@ -15,37 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.process.operator;
+package org.apache.inlong.sdk.transform.process.parser;
 
 import org.apache.inlong.sdk.transform.decode.SourceData;
-import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 
-import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
-import org.apache.commons.lang.ObjectUtils;
+import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction;
+
+import java.math.BigDecimal;
 
 /**
- * NotEqualsToOperator
+ * SubtractionParser
  * 
  */
-public class NotEqualsToOperator implements ExpressionOperator {
+public class SubtractionParser implements ValueParser {
 
     private ValueParser left;
+
     private ValueParser right;
 
-    public NotEqualsToOperator(NotEqualsTo expr) {
+    public SubtractionParser(Subtraction expr) {
         this.left = OperatorTools.buildParser(expr.getLeftExpression());
         this.right = OperatorTools.buildParser(expr.getRightExpression());
     }
 
     /**
-     * check
+     * parse
      * @param sourceData
      * @param rowIndex
      * @return
      */
     @Override
-    public boolean check(SourceData sourceData, int rowIndex) {
-        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    public Object parse(SourceData sourceData, int rowIndex) {
+        Object leftObj = this.left.parse(sourceData, rowIndex);
+        Object rightObj = this.right.parse(sourceData, rowIndex);
+        BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj);
+        BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj);
+        return leftValue.subtract(rightValue);
     }
-
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 282e45edfb..b508f8f2aa 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -269,4 +269,30 @@ public class TestTransformProcessor {
             e.printStackTrace();
         }
     }
+
+    @Test
+    public void testPb2CsvForAdd() {
+        try {
+            List<FieldInfo> fields = this.getTestFieldList();
+            String transformBase64 = this.getPbTestDescription();
+            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select $root.sid,"
+                    + 
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
+                    + 
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
+                    + "*$root.packageID field3,"
+                    + "$root.msgs(0).msg field4 from source "
+                    + "where 
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+                    + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
+            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor = new TransformProcessor(config);
+            byte[] srcBytes = this.getPbTestData();
+            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
+            Assert.assertTrue(output.size() == 1);
+            Assert.assertEquals(output.get(0), 
"sid|2|3426487836002|msgValue4");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }

Reply via email to