This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ae49243a5a [INLONG-10129][SDK] Transform SQL support +-*/ operations (#10133) ae49243a5a is described below commit ae49243a5a4cbf512acfc0c391ac9995e68b2879 Author: 卢春亮 <luchunli...@apache.org> AuthorDate: Tue May 7 19:10:17 2024 +0800 [INLONG-10129][SDK] Transform SQL support +-*/ operations (#10133) --- .../sdk/transform/process/TransformProcessor.java | 13 +++- .../process/operator/EqualsToOperator.java | 5 +- .../operator/GreaterThanEqualsOperator.java | 3 +- .../process/operator/GreaterThanOperator.java | 3 +- .../process/operator/MinorThanEqualsOperator.java | 3 +- .../process/operator/MinorThanOperator.java | 3 +- .../process/operator/NotEqualsToOperator.java | 5 +- .../transform/process/operator/OperatorTools.java | 70 +++++++++++++++++++++- .../AdditionParser.java} | 27 +++++---- .../DivisionParser.java} | 27 +++++---- .../MultiplicationParser.java} | 27 +++++---- .../ParenthesisParser.java} | 26 ++++---- .../SubtractionParser.java} | 27 +++++---- .../transform/process/TestTransformProcessor.java | 26 ++++++++ 14 files changed, 191 insertions(+), 74 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 23ca6644fa..c979ef71c4 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -52,6 +52,8 @@ import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.StringReader; import java.nio.charset.Charset; @@ -67,6 +69,8 @@ import java.util.Map.Entry; */ public class TransformProcessor { + private static final Logger LOG = LoggerFactory.getLogger(TransformProcessor.class); + private TransformConfig config; private SourceDecoder decoder; private SinkEncoder encoder; @@ -166,8 +170,13 @@ public class TransformProcessor { SinkData sinkData = new DefaultSinkData(); for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) { String fieldName = entry.getKey(); - Object fieldValue = entry.getValue().parse(sourceData, i); - sinkData.putField(fieldName, String.valueOf(fieldValue)); + try { + Object fieldValue = entry.getValue().parse(sourceData, i); + sinkData.putField(fieldName, String.valueOf(fieldValue)); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + sinkData.putField(fieldName, ""); + } } sinkDatas.add(this.encoder.encode(sinkData)); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java index 3172626000..6910e0c9ca 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.EqualsTo; -import org.apache.commons.lang.ObjectUtils; /** * EqualsToOperator @@ -43,9 +42,11 @@ public class EqualsToOperator implements ExpressionOperator { * @param rowIndex * @return */ + @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) == 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java index 07da9d79c2..eb7689932e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; -import org.apache.commons.lang.ObjectUtils; /** * GreaterThanEqualsOperator @@ -46,7 +45,7 @@ public class GreaterThanEqualsOperator implements ExpressionOperator { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) >= 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java index 3b2158d96b..e0db44b1e3 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThan; -import org.apache.commons.lang.ObjectUtils; /** * GreaterThanOperator @@ -46,7 +45,7 @@ public class GreaterThanOperator implements ExpressionOperator { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) > 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java index fec4ed8019..8b3628ddb7 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; -import org.apache.commons.lang.ObjectUtils; /** * MinorThanEqualsOperator @@ -46,7 +45,7 @@ public class MinorThanEqualsOperator implements ExpressionOperator { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) <= 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java index 5d9db7dd9c..17baa9cb17 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThan; -import org.apache.commons.lang.ObjectUtils; /** * MinorThanOperator @@ -46,7 +45,7 @@ public class MinorThanOperator implements ExpressionOperator { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) < 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java index 9c58e70476..dbe185dec5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; /** * NotEqualsToOperator @@ -43,9 +42,11 @@ public class NotEqualsToOperator implements ExpressionOperator { * @param rowIndex * @return */ + @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) != 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 8afe2f0c74..361ce1aec3 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -17,9 +17,14 @@ package org.apache.inlong.sdk.transform.process.operator; +import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.DivisionParser; import org.apache.inlong.sdk.transform.process.parser.LongParser; +import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser; +import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser; import org.apache.inlong.sdk.transform.process.parser.StringParser; +import org.apache.inlong.sdk.transform.process.parser.SubtractionParser; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.Expression; @@ -28,6 +33,10 @@ import net.sf.jsqlparser.expression.LongValue; import net.sf.jsqlparser.expression.NotExpression; import net.sf.jsqlparser.expression.Parenthesis; import net.sf.jsqlparser.expression.StringValue; +import net.sf.jsqlparser.expression.operators.arithmetic.Addition; +import net.sf.jsqlparser.expression.operators.arithmetic.Division; +import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; +import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; import net.sf.jsqlparser.expression.operators.conditional.OrExpression; import net.sf.jsqlparser.expression.operators.relational.EqualsTo; @@ -37,6 +46,9 @@ import net.sf.jsqlparser.expression.operators.relational.MinorThan; import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; import net.sf.jsqlparser.schema.Column; +import org.apache.commons.lang.ObjectUtils; + +import java.math.BigDecimal; /** * OperatorTools @@ -44,6 +56,10 @@ import net.sf.jsqlparser.schema.Column; */ public class OperatorTools { + public static final String ROOT_KEY = "$root"; + + public static final String CHILD_KEY = "$child"; + public static ExpressionOperator buildOperator(Expression expr) { if (expr instanceof AndExpression) { return new AndOperator((AndExpression) expr); @@ -76,9 +92,61 @@ public class OperatorTools { return new StringParser((StringValue) expr); } else if (expr instanceof LongValue) { return new LongParser((LongValue) expr); + } else if (expr instanceof Parenthesis) { + return new ParenthesisParser((Parenthesis) expr); + } else if (expr instanceof Addition) { + return new AdditionParser((Addition) expr); + } else if (expr instanceof Subtraction) { + return new SubtractionParser((Subtraction) expr); + } else if (expr instanceof Multiplication) { + return new MultiplicationParser((Multiplication) expr); + } else if (expr instanceof Division) { + return new DivisionParser((Division) expr); } else if (expr instanceof Function) { - return new ColumnParser((Function) expr); + String exprString = expr.toString(); + if (exprString.startsWith(ROOT_KEY) || exprString.startsWith(CHILD_KEY)) { + return new ColumnParser((Function) expr); + } else { + // TODO + } } return null; } + + /** + * parseBigDecimal + * @param value + * @return + */ + public static BigDecimal parseBigDecimal(Object value) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } else { + return new BigDecimal(String.valueOf(value)); + } + } + + /** + * compareValue + * @param value + * @return + */ + @SuppressWarnings("rawtypes") + public static int compareValue(Comparable left, Comparable right) { + if (left instanceof String) { + if (right instanceof String) { + return ObjectUtils.compare(left, right); + } else { + BigDecimal leftValue = parseBigDecimal(left); + return ObjectUtils.compare(leftValue, right); + } + } else { + if (right instanceof String) { + BigDecimal rightValue = parseBigDecimal(right); + return ObjectUtils.compare(left, rightValue); + } else { + return ObjectUtils.compare(left, right); + } + } + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java similarity index 62% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java index 9c58e70476..a0f03ab4cd 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java @@ -15,37 +15,42 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.operator; +package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; -import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; -import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; +import net.sf.jsqlparser.expression.operators.arithmetic.Addition; + +import java.math.BigDecimal; /** - * NotEqualsToOperator + * AdditionParser * */ -public class NotEqualsToOperator implements ExpressionOperator { +public class AdditionParser implements ValueParser { private ValueParser left; + private ValueParser right; - public NotEqualsToOperator(NotEqualsTo expr) { + public AdditionParser(Addition expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); this.right = OperatorTools.buildParser(expr.getRightExpression()); } /** - * check + * parse * @param sourceData * @param rowIndex * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.add(rightValue); } - } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java similarity index 61% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java index 9c58e70476..5dc94b6e99 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java @@ -15,37 +15,42 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.operator; +package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; -import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; -import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; +import net.sf.jsqlparser.expression.operators.arithmetic.Division; + +import java.math.BigDecimal; /** - * NotEqualsToOperator + * DivisionParser * */ -public class NotEqualsToOperator implements ExpressionOperator { +public class DivisionParser implements ValueParser { private ValueParser left; + private ValueParser right; - public NotEqualsToOperator(NotEqualsTo expr) { + public DivisionParser(Division expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); this.right = OperatorTools.buildParser(expr.getRightExpression()); } /** - * check + * parse * @param sourceData * @param rowIndex * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.divide(rightValue); } - } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java similarity index 60% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java index 9c58e70476..7918b434ac 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java @@ -15,37 +15,42 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.operator; +package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; -import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; -import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; +import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; + +import java.math.BigDecimal; /** - * NotEqualsToOperator + * MultiplicationParser * */ -public class NotEqualsToOperator implements ExpressionOperator { +public class MultiplicationParser implements ValueParser { private ValueParser left; + private ValueParser right; - public NotEqualsToOperator(NotEqualsTo expr) { + public MultiplicationParser(Multiplication expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); this.right = OperatorTools.buildParser(expr.getRightExpression()); } /** - * check + * parse * @param sourceData * @param rowIndex * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.multiply(rightValue); } - } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java similarity index 56% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java index 3172626000..61a2bd1bf3 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java @@ -15,37 +15,33 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.operator; +package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; -import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; -import net.sf.jsqlparser.expression.operators.relational.EqualsTo; -import org.apache.commons.lang.ObjectUtils; +import net.sf.jsqlparser.expression.Parenthesis; /** - * EqualsToOperator + * ParenthesisParser * */ -public class EqualsToOperator implements ExpressionOperator { +public class ParenthesisParser implements ValueParser { - private ValueParser left; - private ValueParser right; + private ValueParser node; - public EqualsToOperator(EqualsTo expr) { - this.left = OperatorTools.buildParser(expr.getLeftExpression()); - this.right = OperatorTools.buildParser(expr.getRightExpression()); + public ParenthesisParser(Parenthesis expr) { + this.node = OperatorTools.buildParser(expr.getExpression()); } /** - * check + * parse * @param sourceData * @param rowIndex * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + public Object parse(SourceData sourceData, int rowIndex) { + return node.parse(sourceData, rowIndex); } - } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java similarity index 61% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java index 9c58e70476..af36c79452 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java @@ -15,37 +15,42 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.process.operator; +package org.apache.inlong.sdk.transform.process.parser; import org.apache.inlong.sdk.transform.decode.SourceData; -import org.apache.inlong.sdk.transform.process.parser.ValueParser; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; -import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; +import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; + +import java.math.BigDecimal; /** - * NotEqualsToOperator + * SubtractionParser * */ -public class NotEqualsToOperator implements ExpressionOperator { +public class SubtractionParser implements ValueParser { private ValueParser left; + private ValueParser right; - public NotEqualsToOperator(NotEqualsTo expr) { + public SubtractionParser(Subtraction expr) { this.left = OperatorTools.buildParser(expr.getLeftExpression()); this.right = OperatorTools.buildParser(expr.getRightExpression()); } /** - * check + * parse * @param sourceData * @param rowIndex * @return */ @Override - public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.subtract(rightValue); } - } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 282e45edfb..b508f8f2aa 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -269,4 +269,30 @@ public class TestTransformProcessor { e.printStackTrace(); } } + + @Test + public void testPb2CsvForAdd() { + try { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid," + + "($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2," + + "$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)" + + "*$root.packageID field3," + + "$root.msgs(0).msg field4 from source " + + "where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime" + + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 1); + Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4"); + } catch (Exception e) { + e.printStackTrace(); + } + } }