This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 13481e140d [INLONG-10652][SDK] Inlong Transform support for generics (#10672) 13481e140d is described below commit 13481e140d2114b375e4b43d6af3f27c58c76e70 Author: vernedeng <verned...@apache.org> AuthorDate: Fri Jul 19 09:45:37 2024 +0800 [INLONG-10652][SDK] Inlong Transform support for generics (#10672) * [INLONG-10652][SDK] Inlong Transform support for generics * fix UT --- .../sdk/transform/decode/CsvSourceDecoder.java | 2 +- .../sdk/transform/decode/JsonSourceDecoder.java | 2 +- .../sdk/transform/decode/KvSourceDecoder.java | 2 +- .../sdk/transform/decode/PbSourceDecoder.java | 14 +- .../inlong/sdk/transform/decode/SourceDecoder.java | 5 +- ...ourceDecoder.java => SourceDecoderFactory.java} | 26 ++- .../sdk/transform/encode/CsvSinkEncoder.java | 2 +- .../inlong/sdk/transform/encode/KvSinkEncoder.java | 2 +- .../inlong/sdk/transform/encode/SinkEncoder.java | 4 +- .../{SinkEncoder.java => SinkEncoderFactory.java} | 18 +- .../inlong/sdk/transform/pojo/TransformConfig.java | 45 +---- .../sdk/transform/process/TransformProcessor.java | 98 +++-------- .../sdk/transform/pojo/TestTransformConfig.java | 118 ------------- .../TestTransformArithmeticFunctionsProcessor.java | 66 +++++--- .../transform/process/TestTransformProcessor.java | 187 +++++++++++---------- 15 files changed, 208 insertions(+), 383 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index 14a505d422..daddfd36d7 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -30,7 +30,7 @@ import java.util.Map; * CsvSourceDecoder * */ -public class CsvSourceDecoder implements SourceDecoder { +public class CsvSourceDecoder implements SourceDecoder<String> { protected CsvSourceInfo sourceInfo; private Charset srcCharset = Charset.defaultCharset(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java index 57bf6a9982..13c363912a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java @@ -34,7 +34,7 @@ import java.util.Map; * JsonSourceDecoder * */ -public class JsonSourceDecoder implements SourceDecoder { +public class JsonSourceDecoder implements SourceDecoder<String> { protected JsonSourceInfo sourceInfo; private Charset srcCharset = Charset.defaultCharset(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java index 03b40c9f1c..77a4fef8b4 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java @@ -30,7 +30,7 @@ import java.util.Map; * KvSourceDecoder * */ -public class KvSourceDecoder implements SourceDecoder { +public class KvSourceDecoder implements SourceDecoder<String> { protected KvSourceInfo sourceInfo; private Charset srcCharset = Charset.defaultCharset(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java index 5ac13cf28f..6c8a919e24 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java @@ -38,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; * PbSourceDecoder * */ -public class PbSourceDecoder implements SourceDecoder { +public class PbSourceDecoder implements SourceDecoder<byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PbSourceDecoder.class); @@ -150,16 +150,4 @@ public class PbSourceDecoder implements SourceDecoder { return null; } } - - /** - * decode - * @param srcString - * @param extParams - * @return - */ - @Override - public SourceData decode(String srcString, Map<String, Object> extParams) { - byte[] srcBytes = Base64.getDecoder().decode(srcString); - return this.decode(srcBytes, extParams); - } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java index a11cd89351..7bbb4dda2d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java @@ -22,9 +22,10 @@ import java.util.Map; /** * SourceDecoder */ -public interface SourceDecoder { +public interface SourceDecoder<Input> { SourceData decode(byte[] srcBytes, Map<String, Object> extParams); - SourceData decode(String srcString, Map<String, Object> extParams); + SourceData decode(Input input, Map<String, Object> extParams); + } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java similarity index 51% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java index a11cd89351..b29f6f093c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java @@ -17,14 +17,26 @@ package org.apache.inlong.sdk.transform.decode; -import java.util.Map; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; +import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; -/** - * SourceDecoder - */ -public interface SourceDecoder { +public class SourceDecoderFactory { + + public static CsvSourceDecoder createCsvDecoder(CsvSourceInfo sourceInfo) { + return new CsvSourceDecoder(sourceInfo); + } + + public static KvSourceDecoder createKvDecoder(KvSourceInfo sourceInfo) { + return new KvSourceDecoder(sourceInfo); + } - SourceData decode(byte[] srcBytes, Map<String, Object> extParams); + public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo sourceInfo) { + return new JsonSourceDecoder(sourceInfo); + } - SourceData decode(String srcString, Map<String, Object> extParams); + public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) { + return new PbSourceDecoder(sourceInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index 4b963c10fb..09cae6ea1b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -28,7 +28,7 @@ import java.util.List; /** * CsvSinkEncoder */ -public class CsvSinkEncoder implements SinkEncoder { +public class CsvSinkEncoder implements SinkEncoder<String> { protected CsvSinkInfo sinkInfo; protected Charset sinkCharset = Charset.defaultCharset(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 503914e80a..be0a7ba980 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -28,7 +28,7 @@ import java.util.List; /** * KvSinkEncoder */ -public class KvSinkEncoder implements SinkEncoder { +public class KvSinkEncoder implements SinkEncoder<String> { protected KvSinkInfo sinkInfo; protected Charset sinkCharset = Charset.defaultCharset(); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index 3839da0160..150f1811f1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -24,9 +24,9 @@ import java.util.List; /** * SinkEncoder */ -public interface SinkEncoder { +public interface SinkEncoder<Output> { - String encode(SinkData sinkData); + Output encode(SinkData sinkData); List<FieldInfo> getFields(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java similarity index 67% copy from inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java copy to inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java index 3839da0160..f95d19bfca 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java @@ -17,16 +17,16 @@ package org.apache.inlong.sdk.transform.encode; -import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; -import java.util.List; +public class SinkEncoderFactory { -/** - * SinkEncoder - */ -public interface SinkEncoder { - - String encode(SinkData sinkData); + public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) { + return new CsvSinkEncoder(csvSinkInfo); + } - List<FieldInfo> getFields(); + public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) { + return new KvSinkEncoder(kvSinkInfo); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java index ff1ac958fc..71dd71be3b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java @@ -25,57 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ public class TransformConfig { - @JsonProperty("sourceInfo") - private SourceInfo sourceInfo; - @JsonProperty("sinkInfo") - private SinkInfo sinkInfo; @JsonProperty("transformSql") private String transformSql; @JsonCreator - public TransformConfig( - @JsonProperty("sourceInfo") SourceInfo sourceInfo, - @JsonProperty("sinkInfo") SinkInfo sinkInfo, - @JsonProperty("transformSql") String transformSql) { - this.sourceInfo = sourceInfo; - this.sinkInfo = sinkInfo; + public TransformConfig(@JsonProperty("transformSql") String transformSql) { this.transformSql = transformSql; } - /** - * get sourceInfo - * @return the sourceInfo - */ - @JsonProperty("sourceInfo") - public SourceInfo getSourceInfo() { - return sourceInfo; - } - - /** - * set sourceInfo - * @param sourceInfo the sourceInfo to set - */ - public void setSourceInfo(SourceInfo sourceInfo) { - this.sourceInfo = sourceInfo; - } - - /** - * get sinkInfo - * @return the sinkInfo - */ - @JsonProperty("sinkInfo") - public SinkInfo getSinkInfo() { - return sinkInfo; - } - - /** - * set sinkInfo - * @param sinkInfo the sinkInfo to set - */ - public void setSinkInfo(SinkInfo sinkInfo) { - this.sinkInfo = sinkInfo; - } - /** * get transformSql * @return the transformSql diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index c3ee9270cc..0e74180932 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -17,46 +17,28 @@ package org.apache.inlong.sdk.transform.process; -import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder; -import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder; -import org.apache.inlong.sdk.transform.decode.KvSourceDecoder; -import org.apache.inlong.sdk.transform.decode.PbSourceDecoder; import org.apache.inlong.sdk.transform.decode.SourceData; import org.apache.inlong.sdk.transform.decode.SourceDecoder; -import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder; import org.apache.inlong.sdk.transform.encode.DefaultSinkData; -import org.apache.inlong.sdk.transform.encode.KvSinkEncoder; import org.apache.inlong.sdk.transform.encode.SinkData; import org.apache.inlong.sdk.transform.encode.SinkEncoder; -import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; -import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; -import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; -import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; -import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; -import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; -import org.apache.inlong.sdk.transform.pojo.SinkInfo; -import org.apache.inlong.sdk.transform.pojo.SourceInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator; import org.apache.inlong.sdk.transform.process.operator.OperatorTools; import org.apache.inlong.sdk.transform.process.parser.ValueParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.select.PlainSelect; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -67,65 +49,40 @@ import java.util.Map.Entry; * TransformProcessor * */ -public class TransformProcessor { +public class TransformProcessor<I, O> { private static final Logger LOG = LoggerFactory.getLogger(TransformProcessor.class); + private static final Map<String, Object> EMPTY_EXT_PARAMS = ImmutableMap.of(); + private TransformConfig config; - private SourceDecoder decoder; - private SinkEncoder encoder; - private Charset srcCharset = Charset.defaultCharset(); - protected Charset sinkCharset = Charset.defaultCharset(); + private SourceDecoder<I> decoder; + private SinkEncoder<O> encoder; private PlainSelect transformSelect; private ExpressionOperator where; private Map<String, ValueParser> selectItemMap; - private ObjectMapper objectMapper = new ObjectMapper(); - - public TransformProcessor(String configString) - throws JsonMappingException, JsonProcessingException, JSQLParserException { - TransformConfig config = this.objectMapper.readValue(configString, TransformConfig.class); - this.init(config); - } - - public TransformProcessor(TransformConfig config) throws JSQLParserException { - this.init(config); + public static <I, O> TransformProcessor<I, O> create( + TransformConfig config, + SourceDecoder<I> decoder, + SinkEncoder<O> encoder) throws JSQLParserException { + return new TransformProcessor<>(config, decoder, encoder); } - private void init(TransformConfig config) throws JSQLParserException { + private TransformProcessor( + TransformConfig config, + SourceDecoder<I> decoder, + SinkEncoder<O> encoder) + throws JSQLParserException { this.config = config; - if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) { - this.srcCharset = Charset.forName(config.getSourceInfo().getCharset()); - } - if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) { - this.sinkCharset = Charset.forName(config.getSinkInfo().getCharset()); - } - this.initDecoder(config); - this.initEncoder(config); - this.initTransformSql(); + this.decoder = decoder; + this.encoder = encoder; + this.init(); } - private void initDecoder(TransformConfig config) { - SourceInfo sourceInfo = config.getSourceInfo(); - if (sourceInfo instanceof CsvSourceInfo) { - this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo); - } else if (sourceInfo instanceof KvSourceInfo) { - this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo); - } else if (sourceInfo instanceof JsonSourceInfo) { - this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo); - } else if (sourceInfo instanceof PbSourceInfo) { - this.decoder = new PbSourceDecoder((PbSourceInfo) sourceInfo); - } - } - - private void initEncoder(TransformConfig config) { - SinkInfo sinkInfo = config.getSinkInfo(); - if (sinkInfo instanceof CsvSinkInfo) { - this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo); - } else if (sinkInfo instanceof KvSinkInfo) { - this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo); - } + private void init() throws JSQLParserException { + this.initTransformSql(); } private void initTransformSql() throws JSQLParserException { @@ -157,12 +114,16 @@ public class TransformProcessor { } } - public List<String> transform(byte[] srcBytes, Map<String, Object> extParams) { - SourceData sourceData = this.decoder.decode(srcBytes, extParams); + public List<O> transform(I input) { + return this.transform(input, EMPTY_EXT_PARAMS); + } + + public List<O> transform(I input, Map<String, Object> extParams) { + SourceData sourceData = this.decoder.decode(input, extParams); if (sourceData == null) { return null; } - List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount()); + List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount()); for (int i = 0; i < sourceData.getRowCount(); i++) { if (this.where != null && !this.where.check(sourceData, i)) { continue; @@ -183,7 +144,4 @@ public class TransformProcessor { return sinkDatas; } - public List<String> transform(String srcString, Map<String, Object> extParams) { - return this.transform(srcString.getBytes(this.srcCharset), extParams); - } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java deleted file mode 100644 index c26b3f9cca..0000000000 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.transform.pojo; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -/** - * TestTransformConfig - * - */ -public class TestTransformConfig { - - @Test - public void testCsv() { - try { - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - List<FieldInfo> fields = new ArrayList<>(); - fields.add(ftime); - SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select ftime from source"; - TransformConfig config = new TransformConfig(csvSource, csvSink, transformSql); - ObjectMapper objectMapper = new ObjectMapper(); - String configString = objectMapper.writeValueAsString(config); - System.out.println(configString); - Assert.assertEquals(configString, "{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\"," - + "\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]}," - + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\"," - + "\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]}," - + "\"transformSql\":\"select ftime from source\"}"); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testKv() { - try { - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - List<FieldInfo> fields = new ArrayList<>(); - fields.add(ftime); - SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); - SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); - String transformSql = "select ftime from source"; - TransformConfig config = new TransformConfig(kvSource, kvSink, transformSql); - ObjectMapper objectMapper = new ObjectMapper(); - String configString = objectMapper.writeValueAsString(config); - System.out.println(configString); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testPb() { - try { - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - List<FieldInfo> fields = new ArrayList<>(); - fields.add(ftime); - String transformBase64 = "CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUg" - + "Ntc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLk" - + "V4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUY" - + "AiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMh" - + "AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM="; - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select ftime from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - ObjectMapper objectMapper = new ObjectMapper(); - String configString = objectMapper.writeValueAsString(config); - System.out.println(configString); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testJson() { - try { - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - List<FieldInfo> fields = new ArrayList<>(); - fields.add(ftime); - SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select ftime from source"; - TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); - ObjectMapper objectMapper = new ObjectMapper(); - String configString = objectMapper.writeValueAsString(config); - System.out.println(configString); - } catch (Exception e) { - e.printStackTrace(); - } - } -} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index b291d2b685..488095459c 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -17,11 +17,11 @@ package org.apache.inlong.sdk.transform.process; +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; -import org.apache.inlong.sdk.transform.pojo.SinkInfo; -import org.apache.inlong.sdk.transform.pojo.SourceInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.junit.Assert; @@ -39,8 +39,8 @@ public class TestTransformArithmeticFunctionsProcessor { private static final List<FieldInfo> srcFields = new ArrayList<>(); private static final List<FieldInfo> dstFields = new ArrayList<>(); - private static final SourceInfo csvSource; - private static final SinkInfo kvSink; + private static final CsvSourceInfo csvSource; + private static final KvSinkInfo kvSink; static { for (int i = 1; i < 5; i++) { FieldInfo field = new FieldInfo(); @@ -57,9 +57,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testPowerFunction() throws Exception { String transformSql = "select power(numeric1, numeric2) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: 2^4 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("2|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=16.0"); @@ -76,9 +78,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testAbsFunction() throws Exception { String transformSql = "select abs(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: |2| - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("2|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=2"); @@ -91,9 +95,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testSqrtFunction() throws Exception { String transformSql = "select sqrt(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: sqrt(9) - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("9|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=3.0"); @@ -106,9 +112,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testLnFunction() throws Exception { String transformSql = "select ln(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: ln(1) - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("1|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=0.0"); @@ -121,9 +129,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testLog10Function() throws Exception { String transformSql = "select log10(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: log10(1) - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("1|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=0.0"); @@ -136,9 +146,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testLog2Function() throws Exception { String transformSql = "select log2(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: log2(1) - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("1|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=0.0"); @@ -151,21 +163,27 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testLogFunction() throws Exception { String transformSql1 = "select log(numeric1) from source"; - TransformConfig config1 = new TransformConfig(csvSource, kvSink, transformSql1); + TransformConfig config1 = new TransformConfig(transformSql1); // case1: ln(1) - TransformProcessor processor1 = new TransformProcessor(config1); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor1.transform("1|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=0.0"); String transformSql2 = "select log(numeric1, numeric2) from source"; - TransformConfig config2 = new TransformConfig(csvSource, kvSink, transformSql2); + TransformConfig config2 = new TransformConfig(transformSql2); // case2: log2(8) - TransformProcessor processor2 = new TransformProcessor(config2); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output2 = processor2.transform("2|8|6|8", new HashMap<>()); Assert.assertEquals(1, output2.size()); Assert.assertEquals(output2.get(0), "result=3.0"); // case3: log10(100) - TransformProcessor processor3 = new TransformProcessor(config2); + TransformProcessor<String, String> processor3 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output3 = processor3.transform("10|100|6|8", new HashMap<>()); Assert.assertEquals(1, output3.size()); Assert.assertEquals(output3.get(0), "result=2.0"); @@ -174,9 +192,11 @@ public class TestTransformArithmeticFunctionsProcessor { @Test public void testExpFunction() throws Exception { String transformSql = "select exp(numeric1) from source"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1: e^0 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor.transform("0|4|6|8", new HashMap<>()); Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "result=1.0"); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index b76dde6f7f..93c9448999 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.transform.process; +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; import org.apache.inlong.sdk.transform.pojo.FieldInfo; @@ -24,8 +26,6 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo; import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; -import org.apache.inlong.sdk.transform.pojo.SinkInfo; -import org.apache.inlong.sdk.transform.pojo.SourceInfo; import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.junit.Assert; @@ -51,38 +51,48 @@ public class TestTransformProcessor { FieldInfo extinfo = new FieldInfo(); extinfo.setName("extinfo"); fields.add(extinfo); - SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); - SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields); String transformSql = "select ftime,extinfo from source where extinfo='ok'"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor1 = new TransformProcessor(config); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); // case2 config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); + Assert.assertEquals(0, output2.size()); } @Test public void testCsv2KvNoField() throws Exception { - SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null); - SinkInfo kvSink = new KvSinkInfo("UTF-8", null); + CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null); + KvSinkInfo kvSink = new KvSinkInfo("UTF-8", null); String transformSql = "select $1 ftime,$2 extinfo from source where $2='ok'"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor1 = new TransformProcessor(config); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); // case2 config.setTransformSql("select $1 ftime,$2 extinfo from source where $2!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); + Assert.assertEquals(0, output2.size()); } @Test @@ -94,49 +104,59 @@ public class TestTransformProcessor { FieldInfo extinfo = new FieldInfo(); extinfo.setName("extinfo"); fields.add(extinfo); - SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select ftime,extinfo from source where extinfo='ok'"; - TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor1 = new TransformProcessor(config); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); // case2 config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); + Assert.assertEquals(0, output2.size()); } @Test public void testKv2CsvNoField() throws Exception { - SourceInfo kvSource = new KvSourceInfo("UTF-8", null); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null); + KvSourceInfo kvSource = new KvSourceInfo("UTF-8", null); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null); String transformSql = "select ftime,extinfo from source where extinfo='ok'"; - TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor1 = new TransformProcessor(config); + TransformProcessor<String, String> processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(1, output1.size()); Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); // case2 config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); + TransformProcessor<String, String> processor2 = TransformProcessor + .create(config, SourceDecoderFactory.createKvDecoder(kvSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); + Assert.assertEquals(0, output2.size()); } @Test public void testJson2Csv() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); - SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; - TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); String srcString = "{\n" + " \"sid\":\"value1\",\n" + " \"packageID\":\"value2\",\n" @@ -146,7 +166,7 @@ public class TestTransformProcessor { + " ]\n" + "}"; List<String> output = processor.transform(srcString, new HashMap<>()); - Assert.assertTrue(output.size() == 2); + Assert.assertEquals(2, output.size()); Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4"); } @@ -154,12 +174,14 @@ public class TestTransformProcessor { @Test public void testJson2CsvForOne() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); - SourceInfo jsonSource = new JsonSourceInfo("UTF-8", ""); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", ""); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; - TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<String, String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createJsonDecoder(jsonSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); String srcString = "{\n" + " \"sid\":\"value1\",\n" + " \"packageID\":\"value2\",\n" @@ -169,48 +191,25 @@ public class TestTransformProcessor { + " ]\n" + "}"; List<String> output = processor.transform(srcString, new HashMap<>()); - Assert.assertTrue(output.size() == 1); + Assert.assertEquals(1, output.size()); Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); } - @Test - public void testKvCsvByJsonConfig() throws Exception { - String configString1 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," - + "\"escapeChar\":\"\\\\\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"transformSql\":\"select ftime,extinfo from source where extinfo='ok'\"}"; - // case1 - TransformProcessor processor1 = new TransformProcessor(configString1); - List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); - Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); - // case2 - String configString2 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," - + "\"escapeChar\":\"\\\\\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"transformSql\":\"select ftime,extinfo from source where extinfo!='ok'\"}"; - TransformProcessor processor2 = new TransformProcessor(configString2); - List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); - } - @Test public void testPb2Csv() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<byte[], String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 2); + List<String> output = processor.transform(srcBytes); + Assert.assertEquals(2, output.size()); Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4"); Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42"); } @@ -264,15 +263,17 @@ public class TestTransformProcessor { public void testPb2CsvForOne() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<byte[], String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); byte[] srcBytes = this.getPbTestData(); List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 1); + Assert.assertEquals(1, output.size()); Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4"); } @@ -280,8 +281,8 @@ public class TestTransformProcessor { public void testPb2CsvForAdd() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid," + "($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2," + "$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)" @@ -289,12 +290,14 @@ public class TestTransformProcessor { + "$root.msgs(0).msg field4 from source " + "where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime" + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<byte[], String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); byte[] srcBytes = this.getPbTestData(); List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 1); + Assert.assertEquals(1, output.size()); Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4"); } @@ -302,13 +305,15 @@ public class TestTransformProcessor { public void testPb2CsvForConcat() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select $root.sid,$root.packageID,$child.msgTime," + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<byte[], String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); byte[] srcBytes = this.getPbTestData(); List<String> output = processor.transform(srcBytes, new HashMap<>()); Assert.assertTrue(output.size() == 2); @@ -320,14 +325,16 @@ public class TestTransformProcessor { public void testPb2CsvForNow() throws Exception { List<FieldInfo> fields = this.getTestFieldList(); String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); String transformSql = "select now() from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + TransformConfig config = new TransformConfig(transformSql); // case1 - TransformProcessor processor = new TransformProcessor(config); + TransformProcessor<byte[], String> processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createCsvEncoder(csvSink)); byte[] srcBytes = this.getPbTestData(); List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 2); + Assert.assertEquals(2, output.size()); } }