This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new dfd42dc88d [INLONG-10809][SDK] Improvements to TypeConverter field types and CompareValue in OperatorTools (#10817) dfd42dc88d is described below commit dfd42dc88d7f5bb95d66165ae5cfcf6c92921bd7 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Tue Sep 3 10:24:08 2024 +0800 [INLONG-10809][SDK] Improvements to TypeConverter field types and CompareValue in OperatorTools (#10817) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../inlong/sdk/transform/decode/CsvSourceData.java | 10 +- .../sdk/transform/decode/CsvSourceDecoder.java | 8 +- .../inlong/sdk/transform/decode/SourceData.java | 2 +- .../inlong/sdk/transform/pojo/FieldInfo.java | 2 +- .../converter/DoubleConverter.java} | 16 +- .../converter/LongConverter.java} | 16 +- .../sdk/transform/process/function/IfFunction.java | 55 ++++ .../transform/process/operator/OperatorTools.java | 21 +- .../sdk/transform/process/parser/DoubleParser.java | 1 + .../TestTransformExpressionOperatorsProcessor.java | 354 +++++++++++++++++++++ 10 files changed, 445 insertions(+), 40 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java index d4492b4b85..e0bd9f794c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java @@ -28,14 +28,14 @@ import java.util.Map; */ public class CsvSourceData implements SourceData { - private List<Map<String, String>> rows = new ArrayList<>(); + private List<Map<String, Object>> rows = new ArrayList<>(); - private Map<String, String> currentRow; + private Map<String, Object> currentRow; public CsvSourceData() { } - public void putField(String fieldName, String fieldValue) { + public void putField(String fieldName, Object fieldValue) { this.currentRow.put(fieldName, fieldValue); } @@ -50,11 +50,11 @@ public class CsvSourceData implements SourceData { } @Override - public String getField(int rowNum, String fieldName) { + public Object getField(int rowNum, String fieldName) { if (rowNum >= this.rows.size()) { return null; } - Map<String, String> targetRow = this.rows.get(rowNum); + Map<String, Object> targetRow = this.rows.get(rowNum); return targetRow.get(fieldName); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index fb95dadc43..7b3dedb637 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -76,9 +76,13 @@ public class CsvSourceDecoder implements SourceDecoder<String> { int fieldIndex = 0; for (FieldInfo field : fields) { String fieldName = field.getName(); - String fieldValue = null; + Object fieldValue = null; if (fieldIndex < fieldValues.length) { - fieldValue = fieldValues[fieldIndex]; + try { + fieldValue = field.getConverter().convert(fieldValues[fieldIndex]); + } catch (Exception e) { + throw new RuntimeException(e); + } } sourceData.putField(fieldName, fieldValue); fieldIndex++; diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java index 2c39948f2d..cf5f9c0fbe 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java @@ -26,5 +26,5 @@ public interface SourceData { int getRowCount(); - String getField(int rowNum, String fieldName); + Object getField(int rowNum, String fieldName); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java index 1027dad944..2a7834112a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java @@ -28,7 +28,7 @@ import lombok.Data; public class FieldInfo { private String name; - private TypeConverter converter; + private TypeConverter converter = TypeConverter.DefaultTypeConverter(); public FieldInfo() { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java similarity index 77% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java index 2c39948f2d..52afbda16b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java @@ -15,16 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.decode; +package org.apache.inlong.sdk.transform.process.converter; -/** - * SourceData - */ -public interface SourceData { - - String FIELD_DEFAULT_PREFIX = "$"; - - int getRowCount(); +public class DoubleConverter implements TypeConverter { - String getField(int rowNum, String fieldName); + @Override + public Object convert(String value) throws Exception { + return Double.parseDouble(value); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java similarity index 77% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java index 2c39948f2d..5a18f8ee13 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java @@ -15,16 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.sdk.transform.decode; +package org.apache.inlong.sdk.transform.process.converter; -/** - * SourceData - */ -public interface SourceData { - - String FIELD_DEFAULT_PREFIX = "$"; - - int getRowCount(); +public class LongConverter implements TypeConverter { - String getField(int rowNum, String fieldName); + @Override + public Object convert(String value) throws Exception { + return Long.parseLong(value); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java new file mode 100644 index 0000000000..bda7b9301c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * IfFunction + * description: if(expr,r1,r2) -- expr is an expression, if it holds, return r1; otherwise, return r2 + */ +@TransformFunction(names = {"if"}) +public class IfFunction implements ValueParser { + + private final ExpressionOperator expressionOperator; + private final ValueParser tureValueParser; + private final ValueParser falseValueParser; + + public IfFunction(Function expr) { + List<Expression> expressions = expr.getParameters().getExpressions(); + expressionOperator = OperatorTools.buildOperator(expressions.get(0)); + tureValueParser = OperatorTools.buildParser(expressions.get(1)); + falseValueParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + boolean condition = expressionOperator.check(sourceData, rowIndex, context); + return condition ? tureValueParser.parse(sourceData, rowIndex, context) + : falseValueParser.parse(sourceData, rowIndex, context); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 02b24cdb6b..bb35bb4490 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -133,19 +133,18 @@ public class OperatorTools { if (right == null) { return 1; } - if (left instanceof String) { - if (right instanceof String) { - return ObjectUtils.compare(left, right); - } else { - BigDecimal leftValue = parseBigDecimal(left); - return ObjectUtils.compare(leftValue, right); - } + + if (((Object) left).getClass() == ((Object) right).getClass()) { + return ObjectUtils.compare(left, right); } else { - if (right instanceof String) { + try { + BigDecimal leftValue = parseBigDecimal(left); BigDecimal rightValue = parseBigDecimal(right); - return ObjectUtils.compare(left, rightValue); - } else { - return ObjectUtils.compare(left, right); + return ObjectUtils.compare(leftValue, rightValue); + } catch (Exception e) { + String leftValue = parseString(left); + String rightValue = parseString(right); + return ObjectUtils.compare(leftValue, rightValue); } } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java index ad39558a11..a88b17f6ba 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java @@ -24,6 +24,7 @@ import net.sf.jsqlparser.expression.DoubleValue; /** * LongParser + * */ @TransformParser(values = DoubleValue.class) public class DoubleParser implements ValueParser { diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java new file mode 100644 index 0000000000..67e4e331d4 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.converter.DoubleConverter; +import org.apache.inlong.sdk.transform.process.converter.LongConverter; +import org.apache.inlong.sdk.transform.process.converter.TypeConverter; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * TestArithmeticFunctionsTransformProcessor + * description: test the arithmetic functions in transform processor + */ +public class TestTransformExpressionOperatorsProcessor { + + private static final List<FieldInfo> srcFields = new ArrayList<>(); + private static final List<FieldInfo> dstFields = new ArrayList<>(); + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; + + static { + srcFields.add(new FieldInfo("numeric1", new DoubleConverter())); + srcFields.add(new FieldInfo("string2", TypeConverter.DefaultTypeConverter())); + srcFields.add(new FieldInfo("numeric3", new DoubleConverter())); + srcFields.add(new FieldInfo("numeric4", new LongConverter())); + + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Test + public void testEqualsToOperator() throws Exception { + String transformSql = "select if(string2 = 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|4a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|4a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|4|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 = 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|4.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testNotEqualsToOperator() throws Exception { + String transformSql = "select if(string2 != 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|4a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|4a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|4|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 != 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|4.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testGreaterThanEqualsOperator() throws Exception { + String transformSql = "select if(string2 >= 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 >= 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|3.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testGreaterThanOperator() throws Exception { + String transformSql = "select if(string2 > 4.1,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(numeric3 > 4.1,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|4.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testMinorThanEqualsOperator() throws Exception { + String transformSql = "select if(string2 <= 4,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 <= 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|4.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|4.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testMinorThanOperator() throws Exception { + String transformSql = "select if(string2 < 4.1,1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + + transformSql = "select if(numeric3 < 4,1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + // case4: "3.14159265358979323846|4|3.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + } + + @Test + public void testNotOperator() throws Exception { + String transformSql = "select if(!(string2 < 4),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + + transformSql = "select if(!(numeric3 < 3.9),1,0) from source"; + config = new TransformConfig(transformSql); + // case3: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output3 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + // case4: "3.14159265358979323846|4|3.2|8" + List<String> output4 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + } + + @Test + public void testOrOperator() throws Exception { + String transformSql = "select if((string2 < 4) or (numeric4 > 5),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|8" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=1"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=1"); + // case3: "3.14159265358979323846|5|4|4" + List<String> output3 = processor.transform("3.14159265358979323846|5|4|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=0"); + + transformSql = "select if((numeric3 < 4) or (numeric4 > 5),1,0) from source"; + config = new TransformConfig(transformSql); + // case4: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output4 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=1"); + // case5: "3.14159265358979323846|4|3.2|8" + List<String> output5 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output5.get(0), "result=1"); + // case6: "3.14159265358979323846|4|4.2|5" + List<String> output6 = processor.transform("3.14159265358979323846|4|4.2|5"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output6.get(0), "result=0"); + } + + @Test + public void testAndOperator() throws Exception { + String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0) from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1: "3.14159265358979323846|3a|4|4" + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor.transform("3.14159265358979323846|3a|4|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0"); + // case2: "3.14159265358979323846|5|4|8" + List<String> output2 = processor.transform("3.14159265358979323846|5|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=0"); + // case3: "3.14159265358979323846|3|4|8" + List<String> output3 = processor.transform("3.14159265358979323846|3|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output3.get(0), "result=1"); + + transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from source"; + config = new TransformConfig(transformSql); + // case4: "3.14159265358979323846|4|4|8" + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output4 = processor.transform("3.14159265358979323846|4|4|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output4.get(0), "result=0"); + // case5: "3.14159265358979323846|4|3.2|4" + List<String> output5 = processor.transform("3.14159265358979323846|4|3.2|4"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output5.get(0), "result=0"); + // case6: "3.14159265358979323846|4|3.2|8" + List<String> output6 = processor.transform("3.14159265358979323846|4|3.2|8"); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output6.get(0), "result=1"); + } +}