This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 13481e140d [INLONG-10652][SDK] Inlong Transform support for generics 
(#10672)
13481e140d is described below

commit 13481e140d2114b375e4b43d6af3f27c58c76e70
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Jul 19 09:45:37 2024 +0800

    [INLONG-10652][SDK] Inlong Transform support for generics (#10672)
    
    * [INLONG-10652][SDK] Inlong Transform support for generics
    
    * fix UT
---
 .../sdk/transform/decode/CsvSourceDecoder.java     |   2 +-
 .../sdk/transform/decode/JsonSourceDecoder.java    |   2 +-
 .../sdk/transform/decode/KvSourceDecoder.java      |   2 +-
 .../sdk/transform/decode/PbSourceDecoder.java      |  14 +-
 .../inlong/sdk/transform/decode/SourceDecoder.java |   5 +-
 ...ourceDecoder.java => SourceDecoderFactory.java} |  26 ++-
 .../sdk/transform/encode/CsvSinkEncoder.java       |   2 +-
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |   2 +-
 .../inlong/sdk/transform/encode/SinkEncoder.java   |   4 +-
 .../{SinkEncoder.java => SinkEncoderFactory.java}  |  18 +-
 .../inlong/sdk/transform/pojo/TransformConfig.java |  45 +----
 .../sdk/transform/process/TransformProcessor.java  |  98 +++--------
 .../sdk/transform/pojo/TestTransformConfig.java    | 118 -------------
 .../TestTransformArithmeticFunctionsProcessor.java |  66 +++++---
 .../transform/process/TestTransformProcessor.java  | 187 +++++++++++----------
 15 files changed, 208 insertions(+), 383 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 14a505d422..daddfd36d7 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * CsvSourceDecoder
  * 
  */
-public class CsvSourceDecoder implements SourceDecoder {
+public class CsvSourceDecoder implements SourceDecoder<String> {
 
     protected CsvSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 57bf6a9982..13c363912a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -34,7 +34,7 @@ import java.util.Map;
  * JsonSourceDecoder
  * 
  */
-public class JsonSourceDecoder implements SourceDecoder {
+public class JsonSourceDecoder implements SourceDecoder<String> {
 
     protected JsonSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 03b40c9f1c..77a4fef8b4 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * KvSourceDecoder
  * 
  */
-public class KvSourceDecoder implements SourceDecoder {
+public class KvSourceDecoder implements SourceDecoder<String> {
 
     protected KvSourceInfo sourceInfo;
     private Charset srcCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 5ac13cf28f..6c8a919e24 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -38,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * PbSourceDecoder
  * 
  */
-public class PbSourceDecoder implements SourceDecoder {
+public class PbSourceDecoder implements SourceDecoder<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PbSourceDecoder.class);
 
@@ -150,16 +150,4 @@ public class PbSourceDecoder implements SourceDecoder {
             return null;
         }
     }
-
-    /**
-     * decode
-     * @param srcString
-     * @param extParams
-     * @return
-     */
-    @Override
-    public SourceData decode(String srcString, Map<String, Object> extParams) {
-        byte[] srcBytes = Base64.getDecoder().decode(srcString);
-        return this.decode(srcBytes, extParams);
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index a11cd89351..7bbb4dda2d 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -22,9 +22,10 @@ import java.util.Map;
 /**
  * SourceDecoder
  */
-public interface SourceDecoder {
+public interface SourceDecoder<Input> {
 
     SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
 
-    SourceData decode(String srcString, Map<String, Object> extParams);
+    SourceData decode(Input input, Map<String, Object> extParams);
+
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
similarity index 51%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index a11cd89351..b29f6f093c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -17,14 +17,26 @@
 
 package org.apache.inlong.sdk.transform.decode;
 
-import java.util.Map;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
 
-/**
- * SourceDecoder
- */
-public interface SourceDecoder {
+public class SourceDecoderFactory {
+
+    public static CsvSourceDecoder createCsvDecoder(CsvSourceInfo sourceInfo) {
+        return new CsvSourceDecoder(sourceInfo);
+    }
+
+    public static KvSourceDecoder createKvDecoder(KvSourceInfo sourceInfo) {
+        return new KvSourceDecoder(sourceInfo);
+    }
 
-    SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+    public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo 
sourceInfo) {
+        return new JsonSourceDecoder(sourceInfo);
+    }
 
-    SourceData decode(String srcString, Map<String, Object> extParams);
+    public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) {
+        return new PbSourceDecoder(sourceInfo);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 4b963c10fb..09cae6ea1b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -28,7 +28,7 @@ import java.util.List;
 /**
  * CsvSinkEncoder
  */
-public class CsvSinkEncoder implements SinkEncoder {
+public class CsvSinkEncoder implements SinkEncoder<String> {
 
     protected CsvSinkInfo sinkInfo;
     protected Charset sinkCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 503914e80a..be0a7ba980 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -28,7 +28,7 @@ import java.util.List;
 /**
  * KvSinkEncoder
  */
-public class KvSinkEncoder implements SinkEncoder {
+public class KvSinkEncoder implements SinkEncoder<String> {
 
     protected KvSinkInfo sinkInfo;
     protected Charset sinkCharset = Charset.defaultCharset();
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 3839da0160..150f1811f1 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -24,9 +24,9 @@ import java.util.List;
 /**
  * SinkEncoder
  */
-public interface SinkEncoder {
+public interface SinkEncoder<Output> {
 
-    String encode(SinkData sinkData);
+    Output encode(SinkData sinkData);
 
     List<FieldInfo> getFields();
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
similarity index 67%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 3839da0160..f95d19bfca 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -17,16 +17,16 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 
-import java.util.List;
+public class SinkEncoderFactory {
 
-/**
- * SinkEncoder
- */
-public interface SinkEncoder {
-
-    String encode(SinkData sinkData);
+    public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
+        return new CsvSinkEncoder(csvSinkInfo);
+    }
 
-    List<FieldInfo> getFields();
+    public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
+        return new KvSinkEncoder(kvSinkInfo);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index ff1ac958fc..71dd71be3b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -25,57 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  */
 public class TransformConfig {
 
-    @JsonProperty("sourceInfo")
-    private SourceInfo sourceInfo;
-    @JsonProperty("sinkInfo")
-    private SinkInfo sinkInfo;
     @JsonProperty("transformSql")
     private String transformSql;
 
     @JsonCreator
-    public TransformConfig(
-            @JsonProperty("sourceInfo") SourceInfo sourceInfo,
-            @JsonProperty("sinkInfo") SinkInfo sinkInfo,
-            @JsonProperty("transformSql") String transformSql) {
-        this.sourceInfo = sourceInfo;
-        this.sinkInfo = sinkInfo;
+    public TransformConfig(@JsonProperty("transformSql") String transformSql) {
         this.transformSql = transformSql;
     }
 
-    /**
-     * get sourceInfo
-     * @return the sourceInfo
-     */
-    @JsonProperty("sourceInfo")
-    public SourceInfo getSourceInfo() {
-        return sourceInfo;
-    }
-
-    /**
-     * set sourceInfo
-     * @param sourceInfo the sourceInfo to set
-     */
-    public void setSourceInfo(SourceInfo sourceInfo) {
-        this.sourceInfo = sourceInfo;
-    }
-
-    /**
-     * get sinkInfo
-     * @return the sinkInfo
-     */
-    @JsonProperty("sinkInfo")
-    public SinkInfo getSinkInfo() {
-        return sinkInfo;
-    }
-
-    /**
-     * set sinkInfo
-     * @param sinkInfo the sinkInfo to set
-     */
-    public void setSinkInfo(SinkInfo sinkInfo) {
-        this.sinkInfo = sinkInfo;
-    }
-
     /**
      * get transformSql
      * @return the transformSql
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index c3ee9270cc..0e74180932 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -17,46 +17,28 @@
 
 package org.apache.inlong.sdk.transform.process;
 
-import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.PbSourceDecoder;
 import org.apache.inlong.sdk.transform.decode.SourceData;
 import org.apache.inlong.sdk.transform.decode.SourceDecoder;
-import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder;
 import org.apache.inlong.sdk.transform.encode.DefaultSinkData;
-import org.apache.inlong.sdk.transform.encode.KvSinkEncoder;
 import org.apache.inlong.sdk.transform.encode.SinkData;
 import org.apache.inlong.sdk.transform.encode.SinkEncoder;
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
 import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
 import org.apache.inlong.sdk.transform.process.parser.ValueParser;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserManager;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 import net.sf.jsqlparser.statement.select.Select;
 import net.sf.jsqlparser.statement.select.SelectExpressionItem;
 import net.sf.jsqlparser.statement.select.SelectItem;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.StringReader;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -67,65 +49,40 @@ import java.util.Map.Entry;
  * TransformProcessor
  * 
  */
-public class TransformProcessor {
+public class TransformProcessor<I, O> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TransformProcessor.class);
 
+    private static final Map<String, Object> EMPTY_EXT_PARAMS = 
ImmutableMap.of();
+
     private TransformConfig config;
-    private SourceDecoder decoder;
-    private SinkEncoder encoder;
-    private Charset srcCharset = Charset.defaultCharset();
-    protected Charset sinkCharset = Charset.defaultCharset();
+    private SourceDecoder<I> decoder;
+    private SinkEncoder<O> encoder;
 
     private PlainSelect transformSelect;
     private ExpressionOperator where;
     private Map<String, ValueParser> selectItemMap;
 
-    private ObjectMapper objectMapper = new ObjectMapper();
-
-    public TransformProcessor(String configString)
-            throws JsonMappingException, JsonProcessingException, 
JSQLParserException {
-        TransformConfig config = this.objectMapper.readValue(configString, 
TransformConfig.class);
-        this.init(config);
-    }
-
-    public TransformProcessor(TransformConfig config) throws 
JSQLParserException {
-        this.init(config);
+    public static <I, O> TransformProcessor<I, O> create(
+            TransformConfig config,
+            SourceDecoder<I> decoder,
+            SinkEncoder<O> encoder) throws JSQLParserException {
+        return new TransformProcessor<>(config, decoder, encoder);
     }
 
-    private void init(TransformConfig config) throws JSQLParserException {
+    private TransformProcessor(
+            TransformConfig config,
+            SourceDecoder<I> decoder,
+            SinkEncoder<O> encoder)
+            throws JSQLParserException {
         this.config = config;
-        if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) {
-            this.srcCharset = 
Charset.forName(config.getSourceInfo().getCharset());
-        }
-        if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) {
-            this.sinkCharset = 
Charset.forName(config.getSinkInfo().getCharset());
-        }
-        this.initDecoder(config);
-        this.initEncoder(config);
-        this.initTransformSql();
+        this.decoder = decoder;
+        this.encoder = encoder;
+        this.init();
     }
 
-    private void initDecoder(TransformConfig config) {
-        SourceInfo sourceInfo = config.getSourceInfo();
-        if (sourceInfo instanceof CsvSourceInfo) {
-            this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
-        } else if (sourceInfo instanceof KvSourceInfo) {
-            this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
-        } else if (sourceInfo instanceof JsonSourceInfo) {
-            this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo);
-        } else if (sourceInfo instanceof PbSourceInfo) {
-            this.decoder = new PbSourceDecoder((PbSourceInfo) sourceInfo);
-        }
-    }
-
-    private void initEncoder(TransformConfig config) {
-        SinkInfo sinkInfo = config.getSinkInfo();
-        if (sinkInfo instanceof CsvSinkInfo) {
-            this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo);
-        } else if (sinkInfo instanceof KvSinkInfo) {
-            this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo);
-        }
+    private void init() throws JSQLParserException {
+        this.initTransformSql();
     }
 
     private void initTransformSql() throws JSQLParserException {
@@ -157,12 +114,16 @@ public class TransformProcessor {
         }
     }
 
-    public List<String> transform(byte[] srcBytes, Map<String, Object> 
extParams) {
-        SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+    public List<O> transform(I input) {
+        return this.transform(input, EMPTY_EXT_PARAMS);
+    }
+
+    public List<O> transform(I input, Map<String, Object> extParams) {
+        SourceData sourceData = this.decoder.decode(input, extParams);
         if (sourceData == null) {
             return null;
         }
-        List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+        List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
         for (int i = 0; i < sourceData.getRowCount(); i++) {
             if (this.where != null && !this.where.check(sourceData, i)) {
                 continue;
@@ -183,7 +144,4 @@ public class TransformProcessor {
         return sinkDatas;
     }
 
-    public List<String> transform(String srcString, Map<String, Object> 
extParams) {
-        return this.transform(srcString.getBytes(this.srcCharset), extParams);
-    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
deleted file mode 100644
index c26b3f9cca..0000000000
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.transform.pojo;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TestTransformConfig
- * 
- */
-public class TestTransformConfig {
-
-    @Test
-    public void testCsv() {
-        try {
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            List<FieldInfo> fields = new ArrayList<>();
-            fields.add(ftime);
-            SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", 
fields);
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select ftime from source";
-            TransformConfig config = new TransformConfig(csvSource, csvSink, 
transformSql);
-            ObjectMapper objectMapper = new ObjectMapper();
-            String configString = objectMapper.writeValueAsString(config);
-            System.out.println(configString);
-            Assert.assertEquals(configString, 
"{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
-                    + 
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
-                    + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
-                    + 
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
-                    + "\"transformSql\":\"select ftime from source\"}");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void testKv() {
-        try {
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            List<FieldInfo> fields = new ArrayList<>();
-            fields.add(ftime);
-            SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
-            SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
-            String transformSql = "select ftime from source";
-            TransformConfig config = new TransformConfig(kvSource, kvSink, 
transformSql);
-            ObjectMapper objectMapper = new ObjectMapper();
-            String configString = objectMapper.writeValueAsString(config);
-            System.out.println(configString);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void testPb() {
-        try {
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            List<FieldInfo> fields = new ArrayList<>();
-            fields.add(ftime);
-            String transformBase64 = 
"CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUg"
-                    + 
"Ntc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLk"
-                    + 
"V4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUY"
-                    + 
"AiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMh"
-                    + 
"AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=";
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select ftime from source";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            ObjectMapper objectMapper = new ObjectMapper();
-            String configString = objectMapper.writeValueAsString(config);
-            System.out.println(configString);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Test
-    public void testJson() {
-        try {
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            List<FieldInfo> fields = new ArrayList<>();
-            fields.add(ftime);
-            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select ftime from source";
-            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
-            ObjectMapper objectMapper = new ObjectMapper();
-            String configString = objectMapper.writeValueAsString(config);
-            System.out.println(configString);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
index b291d2b685..488095459c 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.sdk.transform.process;
 
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 
 import org.junit.Assert;
@@ -39,8 +39,8 @@ public class TestTransformArithmeticFunctionsProcessor {
 
     private static final List<FieldInfo> srcFields = new ArrayList<>();
     private static final List<FieldInfo> dstFields = new ArrayList<>();
-    private static final SourceInfo csvSource;
-    private static final SinkInfo kvSink;
+    private static final CsvSourceInfo csvSource;
+    private static final KvSinkInfo kvSink;
     static {
         for (int i = 1; i < 5; i++) {
             FieldInfo field = new FieldInfo();
@@ -57,9 +57,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testPowerFunction() throws Exception {
         String transformSql = "select power(numeric1, numeric2) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: 2^4
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("2|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=16.0");
@@ -76,9 +78,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testAbsFunction() throws Exception {
         String transformSql = "select abs(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: |2|
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("2|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=2");
@@ -91,9 +95,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testSqrtFunction() throws Exception {
         String transformSql = "select sqrt(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: sqrt(9)
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("9|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=3.0");
@@ -106,9 +112,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testLnFunction() throws Exception {
         String transformSql = "select ln(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: ln(1)
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=0.0");
@@ -121,9 +129,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testLog10Function() throws Exception {
         String transformSql = "select log10(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: log10(1)
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=0.0");
@@ -136,9 +146,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testLog2Function() throws Exception {
         String transformSql = "select log2(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: log2(1)
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=0.0");
@@ -151,21 +163,27 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testLogFunction() throws Exception {
         String transformSql1 = "select log(numeric1) from source";
-        TransformConfig config1 = new TransformConfig(csvSource, kvSink, 
transformSql1);
+        TransformConfig config1 = new TransformConfig(transformSql1);
         // case1: ln(1)
-        TransformProcessor processor1 = new TransformProcessor(config1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor1.transform("1|4|6|8", new 
HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=0.0");
         String transformSql2 = "select log(numeric1, numeric2) from source";
-        TransformConfig config2 = new TransformConfig(csvSource, kvSink, 
transformSql2);
+        TransformConfig config2 = new TransformConfig(transformSql2);
         // case2: log2(8)
-        TransformProcessor processor2 = new TransformProcessor(config2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output2 = processor2.transform("2|8|6|8", new 
HashMap<>());
         Assert.assertEquals(1, output2.size());
         Assert.assertEquals(output2.get(0), "result=3.0");
         // case3: log10(100)
-        TransformProcessor processor3 = new TransformProcessor(config2);
+        TransformProcessor<String, String> processor3 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output3 = processor3.transform("10|100|6|8", new 
HashMap<>());
         Assert.assertEquals(1, output3.size());
         Assert.assertEquals(output3.get(0), "result=2.0");
@@ -174,9 +192,11 @@ public class TestTransformArithmeticFunctionsProcessor {
     @Test
     public void testExpFunction() throws Exception {
         String transformSql = "select exp(numeric1) from source";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1: e^0
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor.transform("0|4|6|8", new HashMap<>());
         Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "result=1.0");
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index b76dde6f7f..93c9448999 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.transform.process;
 
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
@@ -24,8 +26,6 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
 import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 
 import org.junit.Assert;
@@ -51,38 +51,48 @@ public class TestTransformProcessor {
         FieldInfo extinfo = new FieldInfo();
         extinfo.setName("extinfo");
         fields.add(extinfo);
-        SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields);
-        SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+        CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", 
fields);
+        KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
         String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor1 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
         List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
-        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
         // case2
         config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
-        TransformProcessor processor2 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
         List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
-        Assert.assertTrue(output2.size() == 0);
+        Assert.assertEquals(0, output2.size());
     }
 
     @Test
     public void testCsv2KvNoField() throws Exception {
-        SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
-        SinkInfo kvSink = new KvSinkInfo("UTF-8", null);
+        CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
+        KvSinkInfo kvSink = new KvSinkInfo("UTF-8", null);
         String transformSql = "select $1 ftime,$2 extinfo from source where 
$2='ok'";
-        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor1 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
-        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
         // case2
         config.setTransformSql("select $1 ftime,$2 extinfo from source where 
$2!='ok'");
-        TransformProcessor processor2 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
         List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
-        Assert.assertTrue(output2.size() == 0);
+        Assert.assertEquals(0, output2.size());
     }
 
     @Test
@@ -94,49 +104,59 @@ public class TestTransformProcessor {
         FieldInfo extinfo = new FieldInfo();
         extinfo.setName("extinfo");
         fields.add(extinfo);
-        SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
-        TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor1 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
         // case2
         config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
-        TransformProcessor processor2 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output2.size() == 0);
+        Assert.assertEquals(0, output2.size());
     }
 
     @Test
     public void testKv2CsvNoField() throws Exception {
-        SourceInfo kvSource = new KvSourceInfo("UTF-8", null);
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null);
+        KvSourceInfo kvSource = new KvSourceInfo("UTF-8", null);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null);
         String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
-        TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor1 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(1, output1.size());
         Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
         // case2
         config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
-        TransformProcessor processor2 = new TransformProcessor(config);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output2.size() == 0);
+        Assert.assertEquals(0, output2.size());
     }
 
     @Test
     public void testJson2Csv() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
-        SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
-        TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         String srcString = "{\n"
                 + "  \"sid\":\"value1\",\n"
                 + "  \"packageID\":\"value2\",\n"
@@ -146,7 +166,7 @@ public class TestTransformProcessor {
                 + "  ]\n"
                 + "}";
         List<String> output = processor.transform(srcString, new HashMap<>());
-        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(2, output.size());
         Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
         Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4");
     }
@@ -154,12 +174,14 @@ public class TestTransformProcessor {
     @Test
     public void testJson2CsvForOne() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
-        SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
-        TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<String, String> processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         String srcString = "{\n"
                 + "  \"sid\":\"value1\",\n"
                 + "  \"packageID\":\"value2\",\n"
@@ -169,48 +191,25 @@ public class TestTransformProcessor {
                 + "  ]\n"
                 + "}";
         List<String> output = processor.transform(srcString, new HashMap<>());
-        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(1, output.size());
         Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
     }
 
-    @Test
-    public void testKvCsvByJsonConfig() throws Exception {
-        String configString1 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
-                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
-                + "\"escapeChar\":\"\\\\\","
-                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                + "\"transformSql\":\"select ftime,extinfo from source where 
extinfo='ok'\"}";
-        // case1
-        TransformProcessor processor1 = new TransformProcessor(configString1);
-        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output1.size() == 1);
-        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
-        // case2
-        String configString2 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
-                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
-                + "\"escapeChar\":\"\\\\\","
-                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                + "\"transformSql\":\"select ftime,extinfo from source where 
extinfo!='ok'\"}";
-        TransformProcessor processor2 = new TransformProcessor(configString2);
-        List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-        Assert.assertTrue(output2.size() == 0);
-    }
-
     @Test
     public void testPb2Csv() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
         String transformBase64 = this.getPbTestDescription();
-        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
-        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
-        List<String> output = processor.transform(srcBytes, new HashMap<>());
-        Assert.assertTrue(output.size() == 2);
+        List<String> output = processor.transform(srcBytes);
+        Assert.assertEquals(2, output.size());
         Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
         Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
     }
@@ -264,15 +263,17 @@ public class TestTransformProcessor {
     public void testPb2CsvForOne() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
         String transformBase64 = this.getPbTestDescription();
-        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
-        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
         List<String> output = processor.transform(srcBytes, new HashMap<>());
-        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(1, output.size());
         Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
     }
 
@@ -280,8 +281,8 @@ public class TestTransformProcessor {
     public void testPb2CsvForAdd() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
         String transformBase64 = this.getPbTestDescription();
-        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select $root.sid,"
                 + 
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
                 + 
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
@@ -289,12 +290,14 @@ public class TestTransformProcessor {
                 + "$root.msgs(0).msg field4 from source "
                 + "where 
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
                 + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
-        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
         List<String> output = processor.transform(srcBytes, new HashMap<>());
-        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(1, output.size());
         Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4");
     }
 
@@ -302,13 +305,15 @@ public class TestTransformProcessor {
     public void testPb2CsvForConcat() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
         String transformBase64 = this.getPbTestDescription();
-        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,"
                 + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
-        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
         List<String> output = processor.transform(srcBytes, new HashMap<>());
         Assert.assertTrue(output.size() == 2);
@@ -320,14 +325,16 @@ public class TestTransformProcessor {
     public void testPb2CsvForNow() throws Exception {
         List<FieldInfo> fields = this.getTestFieldList();
         String transformBase64 = this.getPbTestDescription();
-        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
         String transformSql = "select now() from source";
-        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        TransformConfig config = new TransformConfig(transformSql);
         // case1
-        TransformProcessor processor = new TransformProcessor(config);
+        TransformProcessor<byte[], String> processor = TransformProcessor
+                .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+                        SinkEncoderFactory.createCsvEncoder(csvSink));
         byte[] srcBytes = this.getPbTestData();
         List<String> output = processor.transform(srcBytes, new HashMap<>());
-        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(2, output.size());
     }
 }

Reply via email to