This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd42dc88d [INLONG-10809][SDK] Improvements to TypeConverter field 
types and CompareValue in OperatorTools (#10817)
dfd42dc88d is described below

commit dfd42dc88d7f5bb95d66165ae5cfcf6c92921bd7
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Tue Sep 3 10:24:08 2024 +0800

    [INLONG-10809][SDK] Improvements to TypeConverter field types and 
CompareValue in OperatorTools (#10817)
    
    Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../inlong/sdk/transform/decode/CsvSourceData.java |  10 +-
 .../sdk/transform/decode/CsvSourceDecoder.java     |   8 +-
 .../inlong/sdk/transform/decode/SourceData.java    |   2 +-
 .../inlong/sdk/transform/pojo/FieldInfo.java       |   2 +-
 .../converter/DoubleConverter.java}                |  16 +-
 .../converter/LongConverter.java}                  |  16 +-
 .../sdk/transform/process/function/IfFunction.java |  55 ++++
 .../transform/process/operator/OperatorTools.java  |  21 +-
 .../sdk/transform/process/parser/DoubleParser.java |   1 +
 .../TestTransformExpressionOperatorsProcessor.java | 354 +++++++++++++++++++++
 10 files changed, 445 insertions(+), 40 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index d4492b4b85..e0bd9f794c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -28,14 +28,14 @@ import java.util.Map;
  */
 public class CsvSourceData implements SourceData {
 
-    private List<Map<String, String>> rows = new ArrayList<>();
+    private List<Map<String, Object>> rows = new ArrayList<>();
 
-    private Map<String, String> currentRow;
+    private Map<String, Object> currentRow;
 
     public CsvSourceData() {
     }
 
-    public void putField(String fieldName, String fieldValue) {
+    public void putField(String fieldName, Object fieldValue) {
         this.currentRow.put(fieldName, fieldValue);
     }
 
@@ -50,11 +50,11 @@ public class CsvSourceData implements SourceData {
     }
 
     @Override
-    public String getField(int rowNum, String fieldName) {
+    public Object getField(int rowNum, String fieldName) {
         if (rowNum >= this.rows.size()) {
             return null;
         }
-        Map<String, String> targetRow = this.rows.get(rowNum);
+        Map<String, Object> targetRow = this.rows.get(rowNum);
         return targetRow.get(fieldName);
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index fb95dadc43..7b3dedb637 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -76,9 +76,13 @@ public class CsvSourceDecoder implements 
SourceDecoder<String> {
             int fieldIndex = 0;
             for (FieldInfo field : fields) {
                 String fieldName = field.getName();
-                String fieldValue = null;
+                Object fieldValue = null;
                 if (fieldIndex < fieldValues.length) {
-                    fieldValue = fieldValues[fieldIndex];
+                    try {
+                        fieldValue = 
field.getConverter().convert(fieldValues[fieldIndex]);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
                 }
                 sourceData.putField(fieldName, fieldValue);
                 fieldIndex++;
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
index 2c39948f2d..cf5f9c0fbe 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
@@ -26,5 +26,5 @@ public interface SourceData {
 
     int getRowCount();
 
-    String getField(int rowNum, String fieldName);
+    Object getField(int rowNum, String fieldName);
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 1027dad944..2a7834112a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -28,7 +28,7 @@ import lombok.Data;
 public class FieldInfo {
 
     private String name;
-    private TypeConverter converter;
+    private TypeConverter converter = TypeConverter.DefaultTypeConverter();
 
     public FieldInfo() {
 
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
similarity index 77%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
index 2c39948f2d..52afbda16b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
@@ -15,16 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.decode;
+package org.apache.inlong.sdk.transform.process.converter;
 
-/**
- * SourceData
- */
-public interface SourceData {
-
-    String FIELD_DEFAULT_PREFIX = "$";
-
-    int getRowCount();
+public class DoubleConverter implements TypeConverter {
 
-    String getField(int rowNum, String fieldName);
+    @Override
+    public Object convert(String value) throws Exception {
+        return Double.parseDouble(value);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
similarity index 77%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
index 2c39948f2d..5a18f8ee13 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
@@ -15,16 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.transform.decode;
+package org.apache.inlong.sdk.transform.process.converter;
 
-/**
- * SourceData
- */
-public interface SourceData {
-
-    String FIELD_DEFAULT_PREFIX = "$";
-
-    int getRowCount();
+public class LongConverter implements TypeConverter {
 
-    String getField(int rowNum, String fieldName);
+    @Override
+    public Object convert(String value) throws Exception {
+        return Long.parseLong(value);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
new file mode 100644
index 0000000000..bda7b9301c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * IfFunction
+ * description: if(expr,r1,r2) -- expr is an expression, if it holds, return 
r1; otherwise, return r2
+ */
+@TransformFunction(names = {"if"})
+public class IfFunction implements ValueParser {
+
+    private final ExpressionOperator expressionOperator;
+    private final ValueParser tureValueParser;
+    private final ValueParser falseValueParser;
+
+    public IfFunction(Function expr) {
+        List<Expression> expressions = expr.getParameters().getExpressions();
+        expressionOperator = OperatorTools.buildOperator(expressions.get(0));
+        tureValueParser = OperatorTools.buildParser(expressions.get(1));
+        falseValueParser = OperatorTools.buildParser(expressions.get(2));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        boolean condition = expressionOperator.check(sourceData, rowIndex, 
context);
+        return condition ? tureValueParser.parse(sourceData, rowIndex, context)
+                : falseValueParser.parse(sourceData, rowIndex, context);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 02b24cdb6b..bb35bb4490 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -133,19 +133,18 @@ public class OperatorTools {
         if (right == null) {
             return 1;
         }
-        if (left instanceof String) {
-            if (right instanceof String) {
-                return ObjectUtils.compare(left, right);
-            } else {
-                BigDecimal leftValue = parseBigDecimal(left);
-                return ObjectUtils.compare(leftValue, right);
-            }
+
+        if (((Object) left).getClass() == ((Object) right).getClass()) {
+            return ObjectUtils.compare(left, right);
         } else {
-            if (right instanceof String) {
+            try {
+                BigDecimal leftValue = parseBigDecimal(left);
                 BigDecimal rightValue = parseBigDecimal(right);
-                return ObjectUtils.compare(left, rightValue);
-            } else {
-                return ObjectUtils.compare(left, right);
+                return ObjectUtils.compare(leftValue, rightValue);
+            } catch (Exception e) {
+                String leftValue = parseString(left);
+                String rightValue = parseString(right);
+                return ObjectUtils.compare(leftValue, rightValue);
             }
         }
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
index ad39558a11..a88b17f6ba 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
@@ -24,6 +24,7 @@ import net.sf.jsqlparser.expression.DoubleValue;
 
 /**
  * LongParser
+ *
  */
 @TransformParser(values = DoubleValue.class)
 public class DoubleParser implements ValueParser {
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
new file mode 100644
index 0000000000..67e4e331d4
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.converter.DoubleConverter;
+import org.apache.inlong.sdk.transform.process.converter.LongConverter;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TestArithmeticFunctionsTransformProcessor
+ * description: test the arithmetic functions in transform processor
+ */
+public class TestTransformExpressionOperatorsProcessor {
+
+    private static final List<FieldInfo> srcFields = new ArrayList<>();
+    private static final List<FieldInfo> dstFields = new ArrayList<>();
+    private static final CsvSourceInfo csvSource;
+    private static final KvSinkInfo kvSink;
+
+    static {
+        srcFields.add(new FieldInfo("numeric1", new DoubleConverter()));
+        srcFields.add(new FieldInfo("string2", 
TypeConverter.DefaultTypeConverter()));
+        srcFields.add(new FieldInfo("numeric3", new DoubleConverter()));
+        srcFields.add(new FieldInfo("numeric4", new LongConverter()));
+
+        FieldInfo field = new FieldInfo();
+        field.setName("result");
+        dstFields.add(field);
+        csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+        kvSink = new KvSinkInfo("UTF-8", dstFields);
+    }
+
+    @Test
+    public void testEqualsToOperator() throws Exception {
+        String transformSql = "select if(string2 = 4,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|4a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|4a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=0");
+        // case2: "3.14159265358979323846|4|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+
+        transformSql = "select if(numeric3 = 4,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=1");
+        // case4: "3.14159265358979323846|4|4.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+    }
+
+    @Test
+    public void testNotEqualsToOperator() throws Exception {
+        String transformSql = "select if(string2 != 4,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|4a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|4a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=1");
+        // case2: "3.14159265358979323846|4|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=0");
+
+        transformSql = "select if(numeric3 != 4,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=0");
+        // case4: "3.14159265358979323846|4|4.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=1");
+    }
+
+    @Test
+    public void testGreaterThanEqualsOperator() throws Exception {
+        String transformSql = "select if(string2 >= 4,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=0");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+
+        transformSql = "select if(numeric3 >= 4,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=1");
+        // case4: "3.14159265358979323846|4|3.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|3.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+    }
+
+    @Test
+    public void testGreaterThanOperator() throws Exception {
+        String transformSql = "select if(string2 > 4.1,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=0");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+
+        transformSql = "select if(numeric3 > 4.1,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=0");
+        // case4: "3.14159265358979323846|4|4.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=1");
+    }
+
+    @Test
+    public void testMinorThanEqualsOperator() throws Exception {
+        String transformSql = "select if(string2 <= 4,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=1");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=0");
+
+        transformSql = "select if(numeric3 <= 4,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=1");
+        // case4: "3.14159265358979323846|4|4.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+    }
+
+    @Test
+    public void testMinorThanOperator() throws Exception {
+        String transformSql = "select if(string2 < 4.1,1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=1");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=0");
+
+        transformSql = "select if(numeric3 < 4,1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=0");
+        // case4: "3.14159265358979323846|4|3.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|3.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=1");
+    }
+
+    @Test
+    public void testNotOperator() throws Exception {
+        String transformSql = "select if(!(string2 < 4),1,0) from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=0");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+
+        transformSql = "select if(!(numeric3 < 3.9),1,0) from source";
+        config = new TransformConfig(transformSql);
+        // case3: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output3 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=1");
+        // case4: "3.14159265358979323846|4|3.2|8"
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|3.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+    }
+
+    @Test
+    public void testOrOperator() throws Exception {
+        String transformSql = "select if((string2 < 4) or (numeric4 > 5),1,0) 
from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|8"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=1");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+        // case3: "3.14159265358979323846|5|4|4"
+        List<String> output3 = 
processor.transform("3.14159265358979323846|5|4|4");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=0");
+
+        transformSql = "select if((numeric3 < 4) or (numeric4 > 5),1,0) from 
source";
+        config = new TransformConfig(transformSql);
+        // case4: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=1");
+        // case5: "3.14159265358979323846|4|3.2|8"
+        List<String> output5 = 
processor.transform("3.14159265358979323846|4|3.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output5.get(0), "result=1");
+        // case6: "3.14159265358979323846|4|4.2|5"
+        List<String> output6 = 
processor.transform("3.14159265358979323846|4|4.2|5");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output6.get(0), "result=0");
+    }
+
+    @Test
+    public void testAndOperator() throws Exception {
+        String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0) 
from source";
+        TransformConfig config = new TransformConfig(transformSql);
+        // case1: "3.14159265358979323846|3a|4|4"
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output1 = 
processor.transform("3.14159265358979323846|3a|4|4");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=0");
+        // case2: "3.14159265358979323846|5|4|8"
+        List<String> output2 = 
processor.transform("3.14159265358979323846|5|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output2.get(0), "result=0");
+        // case3: "3.14159265358979323846|3|4|8"
+        List<String> output3 = 
processor.transform("3.14159265358979323846|3|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output3.get(0), "result=1");
+
+        transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from 
source";
+        config = new TransformConfig(transformSql);
+        // case4: "3.14159265358979323846|4|4|8"
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+        List<String> output4 = 
processor.transform("3.14159265358979323846|4|4|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output4.get(0), "result=0");
+        // case5: "3.14159265358979323846|4|3.2|4"
+        List<String> output5 = 
processor.transform("3.14159265358979323846|4|3.2|4");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output5.get(0), "result=0");
+        // case6: "3.14159265358979323846|4|3.2|8"
+        List<String> output6 = 
processor.transform("3.14159265358979323846|4|3.2|8");
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output6.get(0), "result=1");
+    }
+}

Reply via email to