This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec1740e3ca [INLONG-10093][SDK] Support to transform from CSV protocol 
to CSV/KV protocol by single SQL (#10097)
ec1740e3ca is described below

commit ec1740e3ca269956b60024e098cba7a81c603be2
Author: 卢春亮 <luchunli...@apache.org>
AuthorDate: Mon Apr 29 18:21:01 2024 +0800

    [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV 
protocol by single SQL (#10097)
    
    * [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV 
protocol by single SQL
---
 inlong-sdk/pom.xml                                 |   1 +
 inlong-sdk/transform-sdk/pom.xml                   | 102 ++++
 .../inlong/sdk/transform/decode/CsvSourceData.java |  60 +++
 .../sdk/transform/decode/CsvSourceDecoder.java     |  88 ++++
 .../inlong/sdk/transform/decode/KvSourceData.java  |  59 +++
 .../sdk/transform/decode/KvSourceDecoder.java      |  74 +++
 .../inlong/sdk/transform/decode/KvUtils.java       | 579 +++++++++++++++++++++
 .../inlong/sdk/transform/decode/SourceData.java    |  30 ++
 .../inlong/sdk/transform/decode/SourceDecoder.java |  30 ++
 .../inlong/sdk/transform/decode/SplitUtils.java    | 164 ++++++
 .../sdk/transform/encode/CsvSinkEncoder.java       |  77 +++
 .../sdk/transform/encode/DefaultSinkData.java      |  60 +++
 .../inlong/sdk/transform/encode/EscapeUtils.java   |  83 +++
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |  64 +++
 .../inlong/sdk/transform/encode/SinkData.java      |  33 ++
 .../inlong/sdk/transform/encode/SinkEncoder.java   |  26 +
 .../inlong/sdk/transform/pojo/CsvSinkInfo.java     |  99 ++++
 .../inlong/sdk/transform/pojo/CsvSourceInfo.java   |  99 ++++
 .../inlong/sdk/transform/pojo/FieldInfo.java       |  29 ++
 .../inlong/sdk/transform/pojo/JsonSourceInfo.java  |  56 ++
 .../inlong/sdk/transform/pojo/KvSinkInfo.java      |  58 +++
 .../inlong/sdk/transform/pojo/KvSourceInfo.java    |  59 +++
 .../inlong/sdk/transform/pojo/PbSourceInfo.java    |  78 +++
 .../inlong/sdk/transform/pojo/ProtocolType.java    |  61 +++
 .../apache/inlong/sdk/transform/pojo/SinkInfo.java |  87 ++++
 .../inlong/sdk/transform/pojo/SourceInfo.java      |  69 +++
 .../inlong/sdk/transform/pojo/TransformConfig.java |  96 ++++
 .../sdk/transform/process/TransformProcessor.java  | 160 ++++++
 .../transform/process/operator/AndOperator.java    |  49 ++
 .../process/operator/EqualsToOperator.java         |  51 ++
 .../process/operator/ExpressionOperator.java       |  28 +
 .../operator/GreaterThanEqualsOperator.java        |  53 ++
 .../process/operator/GreaterThanOperator.java      |  53 ++
 .../process/operator/MinorThanEqualsOperator.java  |  53 ++
 .../process/operator/MinorThanOperator.java        |  53 ++
 .../process/operator/NotEqualsToOperator.java      |  51 ++
 .../transform/process/operator/NotOperator.java    |  47 ++
 .../transform/process/operator/OperatorTools.java  |  81 +++
 .../sdk/transform/process/operator/OrOperator.java |  49 ++
 .../process/operator/ParenthesisOperator.java      |  47 ++
 .../sdk/transform/process/parser/ColumnParser.java |  47 ++
 .../sdk/transform/process/parser/LongParser.java   |  46 ++
 .../sdk/transform/process/parser/StringParser.java |  46 ++
 .../sdk/transform/process/parser/ValueParser.java  |  29 ++
 .../sdk/transform/pojo/TestTransformConfig.java    | 113 ++++
 .../transform/process/TestTransformProcessor.java  | 128 +++++
 46 files changed, 3505 insertions(+)

diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index f9dd979680..11c51d1aec 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -33,6 +33,7 @@
         <module>sdk-common</module>
         <module>sort-sdk</module>
         <module>dataproxy-sdk</module>
+        <module>transform-sdk</module>
     </modules>
 
     <properties>
diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
new file mode 100644
index 0000000000..aeb3e100b0
--- /dev/null
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-sdk</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>transform-sdk</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+
+    <name>Apache InLong - Transform SDK</name>
+
+    <properties>
+        <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>inlong-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sdk-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-clean-plugin</artifactId>
+                    <version>3.1.0</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-resources-plugin</artifactId>
+                    <version>3.2.0</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>${plugin.compile.version}</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>${plugin.surefire.version}</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>3.0.2</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-install-plugin</artifactId>
+                    <version>2.5.2</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-deploy-plugin</artifactId>
+                    <version>${plugin.deploy.version}</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-site-plugin</artifactId>
+                    <version>3.7.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-project-info-reports-plugin</artifactId>
+                    <version>3.0.0</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+</project>
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
new file mode 100644
index 0000000000..77f173f1a6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CsvSourceData
+ * 
+ */
+public class CsvSourceData implements SourceData {
+
+    private List<Map<String, String>> rows = new ArrayList<>();
+
+    private Map<String, String> currentRow;
+
+    public CsvSourceData() {
+    }
+
+    public void putField(String fieldName, String fieldValue) {
+        this.currentRow.put(fieldName, fieldValue);
+    }
+
+    public void addRow() {
+        this.currentRow = new HashMap<>();
+        rows.add(currentRow);
+    }
+
+    @Override
+    public int getRowCount() {
+        return this.rows.size();
+    }
+
+    @Override
+    public String getField(int rowNum, String fieldName) {
+        if (rowNum > this.rows.size()) {
+            return null;
+        }
+        Map<String, String> targetRow = this.rows.get(rowNum - 1);
+        return targetRow.get(fieldName);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
new file mode 100644
index 0000000000..22e3b9ba13
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CsvSourceDecoder
+ * 
+ */
+public class CsvSourceDecoder implements SourceDecoder {
+
+    protected CsvSourceInfo sourceInfo;
+    private Charset srcCharset = Charset.defaultCharset();
+    private Character delimiter = '|';
+    private Character escapeChar = null;
+    private List<FieldInfo> fields;
+
+    public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
+        this.sourceInfo = sourceInfo;
+        if (!StringUtils.isBlank(sourceInfo.getDelimiter())) {
+            this.delimiter = sourceInfo.getDelimiter().charAt(0);
+        }
+        if (!StringUtils.isBlank(sourceInfo.getEscapeChar())) {
+            this.escapeChar = sourceInfo.getEscapeChar().charAt(0);
+        }
+        if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+            this.srcCharset = Charset.forName(sourceInfo.getCharset());
+        }
+        this.fields = sourceInfo.getFields();
+    }
+
+    @Override
+    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+        String srcString = new String(srcBytes, srcCharset);
+        return this.decode(srcString, extParams);
+    }
+
+    @Override
+    public SourceData decode(String srcString, Map<String, Object> extParams) {
+        String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, 
escapeChar, '\"', '\n', true);
+        CsvSourceData sourceData = new CsvSourceData();
+        for (int i = 0; i < rowValues.length; i++) {
+            String[] fieldValues = rowValues[i];
+            sourceData.addRow();
+            if (fields == null || fields.size() == 0) {
+                for (int j = 0; j < fieldValues.length; j++) {
+                    String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 
1);
+                    sourceData.putField(fieldName, fieldValues[i]);
+                }
+                continue;
+            }
+            int fieldIndex = 0;
+            for (FieldInfo field : fields) {
+                String fieldName = field.getName();
+                String fieldValue = null;
+                if (fieldIndex < fieldValues.length) {
+                    fieldValue = fieldValues[fieldIndex];
+                }
+                sourceData.putField(fieldName, fieldValue);
+                fieldIndex++;
+            }
+        }
+        return sourceData;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
new file mode 100644
index 0000000000..3e3f600197
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KvSourceData
+ */
+public class KvSourceData implements SourceData {
+
+    private List<Map<String, String>> rows = new ArrayList<>();
+
+    private Map<String, String> currentRow;
+
+    public KvSourceData() {
+    }
+
+    public void putField(String fieldName, String fieldValue) {
+        this.currentRow.put(fieldName, fieldValue);
+    }
+
+    public void addRow() {
+        this.currentRow = new HashMap<>();
+        rows.add(currentRow);
+    }
+
+    @Override
+    public int getRowCount() {
+        return this.rows.size();
+    }
+
+    @Override
+    public String getField(int rowNum, String fieldName) {
+        if (rowNum > this.rows.size()) {
+            return null;
+        }
+        Map<String, String> targetRow = this.rows.get(rowNum - 1);
+        return targetRow.get(fieldName);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
new file mode 100644
index 0000000000..03b40c9f1c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KvSourceDecoder
+ * 
+ */
+public class KvSourceDecoder implements SourceDecoder {
+
+    protected KvSourceInfo sourceInfo;
+    private Charset srcCharset = Charset.defaultCharset();
+    private List<FieldInfo> fields;
+
+    public KvSourceDecoder(KvSourceInfo sourceInfo) {
+        this.sourceInfo = sourceInfo;
+        if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+            this.srcCharset = Charset.forName(sourceInfo.getCharset());
+        }
+        this.fields = sourceInfo.getFields();
+    }
+
+    @Override
+    public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+        String srcString = new String(srcBytes, srcCharset);
+        return this.decode(srcString, extParams);
+    }
+
+    @Override
+    public SourceData decode(String srcString, Map<String, Object> extParams) {
+        List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', 
'=', '\\', '\"', '\n');
+        KvSourceData sourceData = new KvSourceData();
+        if (fields == null || fields.size() == 0) {
+            for (Map<String, String> row : rowValues) {
+                sourceData.addRow();
+                row.forEach((k, v) -> sourceData.putField(k, v));
+            }
+            return sourceData;
+        }
+        for (Map<String, String> row : rowValues) {
+            sourceData.addRow();
+            for (FieldInfo field : fields) {
+                String fieldName = field.getName();
+                String fieldValue = row.get(fieldName);
+                sourceData.putField(fieldName, fieldValue);
+            }
+        }
+        return sourceData;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
new file mode 100644
index 0000000000..60c804c48f
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The utilities for strings.
+ */
+public class KvUtils {
+
+    private static final int STATE_NORMAL = 0;
+    private static final int STATE_KEY = 2;
+    private static final int STATE_VALUE = 4;
+    private static final int STATE_ESCAPING = 8;
+    private static final int STATE_QUOTING = 16;
+
+    /**
+     * Splits the kv text.
+     *
+     * <p>Both escaping and quoting is supported. When the escape character is
+     * not '\0', then the next character to the escape character will be
+     * escaped. When the quote character is not '\0', then all characters
+     * between consecutive quote characters will be escaped.</p>
+     *
+     * @param text The text to be split.
+     * @param entryDelimiter The delimiter of entries.
+     * @param kvDelimiter The delimiter between key and value.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @return The fields split from the text.
+     */
+    public static Map<String, String> splitKv(
+            @Nonnull String text,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar) {
+        List<Map<String, String>> lines = splitKv(text, entryDelimiter, 
kvDelimiter, escapeChar, quoteChar, null);
+        if (lines.size() == 0) {
+            return new HashMap<>();
+        }
+        return lines.get(0);
+    }
+
+    /**
+     * Splits the kv text.
+     *
+     * <p>Both escaping and quoting is supported. When the escape character is
+     * not '\0', then the next character to the escape character will be
+     * escaped. When the quote character is not '\0', then all characters
+     * between consecutive quote characters will be escaped.</p>
+     *
+     * @param text The text to be split.
+     * @param entryDelimiter The delimiter of entries.
+     * @param kvDelimiter The delimiter between key and value.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @param lineDelimiter The line delimiter character.
+     * @return The fields split from the text.
+     */
+    public static List<Map<String, String>> splitKv(
+            @Nonnull String text,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter) {
+        Map<String, String> fields = new HashMap<>();
+        List<Map<String, String>> lines = new ArrayList<>();
+
+        StringBuilder stringBuilder = new StringBuilder();
+
+        String key = "";
+        String value;
+
+        int state = STATE_KEY;
+
+        /*
+         * The state when entering escaping and quoting. When we exit escaping 
or quoting, we should restore this state.
+         */
+        int kvState = STATE_KEY;
+
+        for (int i = 0; i < text.length(); ++i) {
+            char ch = text.charAt(i);
+
+            if (ch == kvDelimiter) {
+                switch (state) {
+                    case STATE_KEY:
+                        key = stringBuilder.toString();
+                        stringBuilder.setLength(0);
+                        state = STATE_VALUE;
+                        break;
+                    case STATE_VALUE:
+                        // throw new IllegalArgumentException("Unexpected 
token " +
+                        // ch + " at position " + i + ".");
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (ch == entryDelimiter) {
+                switch (state) {
+                    case STATE_KEY:
+                        // throw new IllegalArgumentException("Unexpected 
token " +
+                        // ch + " at position " + i + ".");
+                        key = stringBuilder.toString();
+                        stringBuilder.setLength(0);
+                        fields.put(key, "");
+                        state = STATE_KEY;
+                        break;
+                    case STATE_VALUE:
+                        value = stringBuilder.toString();
+                        fields.put(key, value);
+
+                        stringBuilder.setLength(0);
+                        state = STATE_KEY;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (escapeChar != null && ch == escapeChar) {
+                switch (state) {
+                    case STATE_KEY:
+                    case STATE_VALUE:
+                        kvState = state;
+                        state = STATE_ESCAPING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (quoteChar != null && ch == quoteChar) {
+                switch (state) {
+                    case STATE_KEY:
+                    case STATE_VALUE:
+                        kvState = state;
+                        state = STATE_QUOTING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        state = kvState;
+                        break;
+                    default:
+                        break;
+                }
+            } else if (lineDelimiter != null && ch == lineDelimiter) {
+                switch (state) {
+                    case STATE_VALUE:
+                        value = stringBuilder.toString();
+                        fields.put(key, value);
+                        Map<String, String> copyFields = new HashMap<>();
+                        copyFields.putAll(fields);
+                        lines.add(copyFields);
+                        stringBuilder.setLength(0);
+                        fields.clear();
+                        state = STATE_KEY;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else {
+                stringBuilder.append(ch);
+            }
+        }
+
+        switch (state) {
+            case STATE_KEY:
+                // throw new IllegalArgumentException("Dangling key.");
+                key = stringBuilder.toString();
+                stringBuilder.setLength(0);
+                fields.put(key, "");
+                lines.add(fields);
+                return lines;
+            case STATE_VALUE:
+                value = stringBuilder.toString();
+                fields.put(key, value);
+                lines.add(fields);
+                return lines;
+            case STATE_ESCAPING:
+                // throw new IllegalArgumentException("Not closed escaping.");
+            case STATE_QUOTING:
+                // throw new IllegalArgumentException("Not closed quoting.");
+            default:
+                // throw new IllegalStateException();
+                if (kvState == STATE_VALUE) {
+                    key = stringBuilder.toString();
+                    stringBuilder.setLength(0);
+                    fields.put(key, "");
+                    lines.add(fields);
+                    return lines;
+                    // } else if (kvState == STATE_KEY) {
+                } else {
+                    value = stringBuilder.toString();
+                    fields.put(key, value);
+                    lines.add(fields);
+                    return lines;
+                }
+        }
+    }
+
+    /**
+     * Concat the given fields' keys and values.
+     *
+     * <p>Special characters in the text will be escaped or quoted if
+     * corresponding character is given. Otherwise, an exception will be
+     * thrown.</p>
+     *
+     * @param fieldKeys The keys to be concat.
+     * @param fieldValues The values to be concat.
+     * @param entryDelimiter The delimiter of entries.
+     * @param kvDelimiter The delimiter between key and value.
+     * @param escapeChar The escape character.
+     * @param quoteChar The quote character.
+     * @return The concated text of given fields.
+     */
+    public static String concatKv(
+            @Nonnull String[] fieldKeys,
+            @Nonnull String[] fieldValues,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar) {
+        if (fieldKeys.length != fieldValues.length) {
+            throw new IllegalArgumentException("The keys' number " + 
fieldKeys.length
+                    + " doesn't match values' number " + fieldValues.length);
+        }
+
+        Collection<Character> delimiters = Arrays.asList(entryDelimiter, 
kvDelimiter);
+
+        StringBuilder stringBuilder = new StringBuilder();
+
+        for (int index = 0; index < fieldKeys.length; ++index) {
+
+            encodeText(
+                    stringBuilder,
+                    fieldKeys[index],
+                    delimiters,
+                    escapeChar,
+                    quoteChar);
+
+            stringBuilder.append(kvDelimiter);
+
+            encodeText(
+                    stringBuilder,
+                    fieldValues[index],
+                    delimiters,
+                    escapeChar,
+                    quoteChar);
+
+            if (index < fieldKeys.length - 1) {
+                stringBuilder.append(entryDelimiter);
+            }
+        }
+
+        return stringBuilder.toString();
+    }
+
+    private static void encodeText(
+            StringBuilder stringBuilder,
+            String text,
+            Collection<Character> delimiters,
+            Character escapeChar,
+            Character quoteChar) {
+        for (int i = 0; i < text.length(); ++i) {
+            char ch = text.charAt(i);
+
+            if (delimiters.contains(ch)) {
+                if (escapeChar != null) {
+                    stringBuilder.append(escapeChar);
+                    stringBuilder.append(ch);
+                } else if (quoteChar != null) {
+                    stringBuilder.append(quoteChar);
+                    stringBuilder.append(ch);
+                    stringBuilder.append(quoteChar);
+                } else {
+                    throw new IllegalArgumentException("There is a delimiter 
in the text, "
+                            + "but neither escape nor quote character is 
specified.");
+                }
+            } else if (escapeChar != null && ch == escapeChar) {
+                stringBuilder.append(escapeChar);
+                stringBuilder.append(ch);
+            } else if (quoteChar != null && ch == quoteChar) {
+                if (escapeChar != null) {
+                    stringBuilder.append(escapeChar);
+                    stringBuilder.append(ch);
+                } else {
+                    throw new IllegalArgumentException("There is a quote 
character in the text, "
+                            + "but escape character is not specified.");
+                }
+            } else {
+                stringBuilder.append(ch);
+            }
+        }
+    }
+
+    /**
+     * Splits a single line of csv text.
+     *
+     * @see KvUtils#splitCsv(String, Character, Character, Character, 
Character, boolean)
+     *
+     * @param text The text to be split.
+     * @param delimiter The delimiter of fields.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @return The split array content.
+     */
+    public static String[] splitCsv(
+            @Nonnull String text,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar) {
+        String[][] splitResult = splitCsv(text, delimiter, escapeChar, 
quoteChar, null);
+        if (splitResult.length == 0) {
+            return new String[0];
+        }
+        return splitResult[0];
+    }
+
+    /**
+     * @see KvUtils#splitCsv(String, Character, Character, Character, 
Character, boolean)
+     *
+     * @param text The text to be split.
+     * @param delimiter The delimiter of fields.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @param lineDelimiter The delimiter between lines, e.g. '\n'.
+     * @return The split value.
+     */
+    public static String[][] splitCsv(
+            @Nonnull String text,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter) {
+        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, 
false);
+    }
+
+    /**
+     * Splits the csv text, which may contains multiple lines of data.
+     *
+     * <p>Both escaping and quoting is supported. When the escape character is
+     * not '\0', then the next character to the escape character will be
+     * escaped. When the quote character is not '\0', then all characters
+     * between consecutive quote characters will be escaped.</p>
+     *
+     * @param text The text to be split.
+     * @param delimiter The delimiter of fields.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @param lineDelimiter The delimiter between lines, e.g. '\n'.
+     * @param deleteHeadDelimiter If true and the leading character of a line
+     *                            is a delimiter, it will be ignored.
+     * @return A 2-D String array representing the parsed data, where the 1st
+     * dimension is row and the 2nd dimension is column.
+     */
+    public static String[][] splitCsv(
+            @Nonnull String text,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter,
+            boolean deleteHeadDelimiter) {
+        List<String[]> lines = new ArrayList<>();
+        List<String> fields = new ArrayList<>();
+
+        StringBuilder stringBuilder = new StringBuilder();
+        int state = STATE_NORMAL;
+
+        for (int i = 0; i < text.length(); ++i) {
+            char ch = text.charAt(i);
+
+            if (ch == delimiter) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        if (deleteHeadDelimiter && fields.isEmpty()
+                                && stringBuilder.length() == 0) {
+                            break;
+                        }
+                        String field = stringBuilder.toString();
+                        fields.add(field);
+                        stringBuilder.setLength(0);
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (escapeChar != null && ch == escapeChar) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        state = STATE_ESCAPING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (quoteChar != null && ch == quoteChar) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        state = STATE_QUOTING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        state = STATE_NORMAL;
+                        break;
+                    default:
+                        break;
+                }
+            } else if (lineDelimiter != null && ch == lineDelimiter) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        String field = stringBuilder.toString();
+                        fields.add(field);
+                        lines.add(fields.toArray(new String[0]));
+
+                        stringBuilder.setLength(0);
+                        fields.clear();
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                    default:
+                        break;
+                }
+            } else {
+                if (state == STATE_ESCAPING) {
+                    state = STATE_NORMAL;
+                }
+                stringBuilder.append(ch);
+            }
+        }
+
+        switch (state) {
+            case STATE_NORMAL:
+                String field = stringBuilder.toString();
+                fields.add(field);
+                lines.add(fields.toArray(new String[0]));
+
+                String[][] result = new String[lines.size()][];
+                for (int i = 0; i < lines.size(); ++i) {
+                    result[i] = lines.get(i);
+                }
+                return result;
+
+            case STATE_ESCAPING:
+                throw new IllegalArgumentException(String.format("Not closed 
escaping. Text=[%s].", text));
+            case STATE_QUOTING:
+                throw new IllegalArgumentException(String.format("Not closed 
quoting. Text=[%s].", text));
+            default:
+                throw new IllegalStateException(String.format("Text=[%s].", 
text));
+        }
+    }
+
+    /**
+     * Concat the given fields.
+     *
+     * <p>Special characters in the text will be escaped or quoted if
+     * corresponding character is given. Otherwise, an exception will be
+     * thrown.</p>
+     *
+     * @param fields The fields to be concat.
+     * @param delimiter The delimiter of fields.
+     * @param escapeChar The escape character.
+     * @param quoteChar The quote character.
+     * @return The concated text of given fields.
+     */
+    public static String concatCsv(
+            @Nonnull String[] fields,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar) {
+        StringBuilder stringBuilder = new StringBuilder();
+
+        for (int index = 0; index < fields.length; ++index) {
+
+            String field = fields[index];
+
+            for (int i = 0; i < field.length(); ++i) {
+                char ch = field.charAt(i);
+
+                if (ch == delimiter
+                        || (escapeChar != null && ch == escapeChar)
+                        || (quoteChar != null && ch == quoteChar)) {
+
+                    if (escapeChar != null) {
+                        stringBuilder.append(escapeChar);
+                        stringBuilder.append(ch);
+                    } else if (quoteChar != null && ch != quoteChar) {
+                        stringBuilder.append(quoteChar);
+                        stringBuilder.append(ch);
+                        stringBuilder.append(quoteChar);
+                    } else {
+                        throw new IllegalArgumentException("There exist 
special characters in the text, "
+                                + "but neither escape character nor quote 
character is configured.");
+                    }
+                } else {
+                    stringBuilder.append(ch);
+                }
+            }
+
+            if (index < fields.length - 1) {
+                stringBuilder.append(delimiter);
+            }
+        }
+
+        return stringBuilder.toString();
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
new file mode 100644
index 0000000000..2c39948f2d
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+/**
+ * SourceData
+ */
+public interface SourceData {
+
+    String FIELD_DEFAULT_PREFIX = "$";
+
+    int getRowCount();
+
+    String getField(int rowNum, String fieldName);
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
new file mode 100644
index 0000000000..a11cd89351
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.Map;
+
+/**
+ * SourceDecoder
+ */
+public interface SourceDecoder {
+
+    SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+
+    SourceData decode(String srcString, Map<String, Object> extParams);
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
new file mode 100644
index 0000000000..f29af8f93a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SplitUtils
+ * 
+ */
+public class SplitUtils {
+
+    public static final int STATE_NORMAL = 0;
+    public static final int STATE_KEY = 2;
+    public static final int STATE_VALUE = 4;
+    public static final int STATE_ESCAPING = 8;
+    public static final int STATE_QUOTING = 16;
+
+    public static String[] splitCsv(@Nonnull String text, @Nonnull Character 
delimiter, @Nullable Character escapeChar,
+            @Nullable Character quoteChar, boolean hasEscapeProcess) {
+        String[][] splitResult = splitCsv(text, delimiter, escapeChar, 
quoteChar, null, hasEscapeProcess);
+        if (splitResult.length == 0) {
+            return new String[0];
+        }
+        return splitResult[0];
+    }
+
+    public static String[][] splitCsv(@Nonnull String text, @Nonnull Character 
delimiter,
+            @Nullable Character escapeChar, @Nullable Character quoteChar, 
@Nullable Character lineDelimiter,
+            boolean hasEscapeProcess) {
+        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, 
false, hasEscapeProcess);
+    }
+
+    public static String[][] splitCsv(@Nonnull String text, @Nonnull Character 
delimiter,
+            @Nullable Character escapeChar, @Nullable Character quoteChar, 
@Nullable Character lineDelimiter,
+            boolean deleteHeadDelimiter, boolean hasEscapeProcess) {
+        char deli = delimiter.charValue();
+        char escape = (escapeChar == null) ? '\\' : escapeChar.charValue();
+        char quote = (quoteChar == null) ? '\"' : quoteChar.charValue();
+        char line = (lineDelimiter == null) ? '\n' : lineDelimiter.charValue();
+        List<String[]> lines = new ArrayList<>();
+        List<String> fields = new ArrayList<>();
+
+        int state = STATE_NORMAL;
+
+        char[] srcValue = text.toCharArray();
+        char[] fieldValue = new char[srcValue.length];
+        int fieldIndex = 0;
+        for (int i = 0; i < text.length(); i++) {
+            char ch = srcValue[i];
+
+            if (ch == deli) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        if (fieldIndex == 0 && deleteHeadDelimiter && 
fields.isEmpty()) {
+                            break;
+                        }
+                        fields.add(new String(fieldValue, 0, fieldIndex));
+                        fieldIndex = 0;
+                        break;
+                    case STATE_ESCAPING:
+                        fieldValue[fieldIndex++] = ch;
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        fieldValue[fieldIndex++] = ch;
+                        break;
+                    default:
+                        break;
+                }
+            } else if (escapeChar != null && ch == escape) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        state = STATE_ESCAPING;
+                        break;
+                    case STATE_ESCAPING:
+                        if (!hasEscapeProcess) {
+                            fieldValue[fieldIndex++] = escapeChar;
+                        }
+                        fieldValue[fieldIndex++] = ch;
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        fieldValue[fieldIndex++] = ch;
+                        break;
+                    default:
+                        break;
+                }
+            } else if (quoteChar != null && ch == quote) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        state = STATE_QUOTING;
+                        break;
+                    case STATE_ESCAPING:
+                        if (!hasEscapeProcess) {
+                            fieldValue[fieldIndex++] = escapeChar;
+                        }
+                        fieldValue[fieldIndex++] = ch;
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        state = STATE_NORMAL;
+                        break;
+                    default:
+                        break;
+                }
+            } else if (lineDelimiter != null && ch == line) {
+                switch (state) {
+                    case STATE_NORMAL:
+                        fields.add(new String(fieldValue, 0, fieldIndex));
+                        fieldIndex = 0;
+                        lines.add(fields.toArray(new String[0]));
+                        fields.clear();
+                        break;
+                    case STATE_ESCAPING:
+                        fieldValue[fieldIndex++] = ch;
+                        state = STATE_NORMAL;
+                        break;
+                    case STATE_QUOTING:
+                        fieldValue[fieldIndex++] = ch;
+                        break;
+                    default:
+                        break;
+                }
+            } else {
+                if (state == STATE_ESCAPING) {
+                    if (!hasEscapeProcess) {
+                        fieldValue[fieldIndex++] = escapeChar;
+                    }
+                    state = STATE_NORMAL;
+                }
+                fieldValue[fieldIndex++] = ch;
+            }
+        }
+        fields.add(new String(fieldValue, 0, fieldIndex));
+        fieldIndex = 0;
+        lines.add(fields.toArray(new String[0]));
+
+        String[][] result = new String[lines.size()][];
+        for (int i = 0; i < lines.size(); i++) {
+            result[i] = lines.get(i);
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
new file mode 100644
index 0000000000..cb3c9405a0
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * CsvSinkEncoder
+ */
+public class CsvSinkEncoder implements SinkEncoder {
+
+    protected CsvSinkInfo sinkInfo;
+    protected Charset sinkCharset = Charset.defaultCharset();
+    private Character delimiter = '|';
+    private Character escapeChar = null;
+    private List<FieldInfo> fields;
+    private StringBuilder builder = new StringBuilder();
+
+    public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
+        this.sinkInfo = sinkInfo;
+        if (!StringUtils.isBlank(sinkInfo.getDelimiter())) {
+            this.delimiter = sinkInfo.getDelimiter().charAt(0);
+        }
+        if (!StringUtils.isBlank(sinkInfo.getEscapeChar())) {
+            this.escapeChar = sinkInfo.getEscapeChar().charAt(0);
+        }
+        if (!StringUtils.isBlank(sinkInfo.getCharset())) {
+            this.sinkCharset = Charset.forName(sinkInfo.getCharset());
+        }
+        this.fields = sinkInfo.getFields();
+    }
+
+    /**
+     * encode
+     * @param sinkData
+     * @return
+     */
+    @Override
+    public String encode(SinkData sinkData) {
+        if (fields == null || fields.size() == 0) {
+            return "";
+        }
+        builder.delete(0, builder.length());
+        if (escapeChar == null) {
+            fields.forEach(v -> 
builder.append(sinkData.getField(v.getName())).append(delimiter));
+        } else {
+            for (FieldInfo field : fields) {
+                String fieldName = field.getName();
+                String fieldValue = sinkData.getField(fieldName);
+                EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
+                builder.append(delimiter);
+            }
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
new file mode 100644
index 0000000000..2e1c3bea4b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * DefaultSinkData
+ * 
+ */
+public class DefaultSinkData implements SinkData {
+
+    private Map<String, String> currentRow = new HashMap<>();
+
+    /**
+     * putField
+     * @param fieldName
+     * @param fieldValue
+     */
+    @Override
+    public void putField(String fieldName, String fieldValue) {
+        this.currentRow.put(fieldName, fieldValue);
+    }
+
+    /**
+     * getField
+     * @param fieldName
+     * @return
+     */
+    @Override
+    public String getField(String fieldName) {
+        return this.currentRow.getOrDefault(fieldName, "");
+    }
+
+    /**
+     * keySet
+     * @return
+     */
+    @Override
+    public Set<String> keySet() {
+        return this.currentRow.keySet();
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
new file mode 100644
index 0000000000..2ba3c429d1
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.List;
+
+/**
+ * EscapeUtils
+ * 
+ */
+public class EscapeUtils {
+
+    public static String escapeStringSeparator(String fieldValue, char 
separator) {
+        StringBuilder builder = new StringBuilder();
+        escapeContent(builder, separator, '\\', fieldValue);
+        String formatField = builder.toString();
+        return formatField;
+    }
+
+    public static String escapeFields(List<String> fields, char separator) {
+        if (fields.size() <= 0) {
+            return "";
+        }
+        StringBuilder ss = new StringBuilder();
+        for (String field : fields) {
+            String fmtField = escapeStringSeparator(field, separator);
+            ss.append(fmtField).append(separator);
+        }
+        String result = ss.substring(0, ss.length() - 1);
+        return result;
+    }
+
+    public static void escapeContent(StringBuilder builder, char separator, 
char escapeChar, Object field) {
+        String strField = "";
+        if (field != null) {
+            strField = String.valueOf(field);
+        }
+        int length = strField.length();
+
+        for (int i = 0; i < length; i++) {
+            putValueIntoStringBuilder(builder, separator, escapeChar, 
strField.charAt(i));
+        }
+    }
+
+    public static void putValueIntoStringBuilder(StringBuilder builder, char 
separator, final char escapeChar,
+            char value) {
+        switch (value) {
+            case 0:
+                builder.append(escapeChar).append('0');
+                break;
+            case '\n':
+                builder.append(escapeChar).append('n');
+                break;
+            case '\r':
+                builder.append(escapeChar).append('r');
+                break;
+            default:
+                if (value == separator) {
+                    builder.append(escapeChar).append(separator);
+                } else if (value == escapeChar) {
+                    builder.append(escapeChar).append(escapeChar);
+                } else {
+                    builder.append(value);
+                }
+                break;
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
new file mode 100644
index 0000000000..3fcc31107e
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * KvSinkEncoder
+ */
+public class KvSinkEncoder implements SinkEncoder {
+
+    protected KvSinkInfo sinkInfo;
+    protected Charset sinkCharset = Charset.defaultCharset();
+    private List<FieldInfo> fields;
+    private StringBuilder builder = new StringBuilder();
+
+    public KvSinkEncoder(KvSinkInfo sinkInfo) {
+        this.sinkInfo = sinkInfo;
+        if (!StringUtils.isBlank(sinkInfo.getCharset())) {
+            this.sinkCharset = Charset.forName(sinkInfo.getCharset());
+        }
+        this.fields = sinkInfo.getFields();
+    }
+
+    /**
+     * encode
+     * @param sinkData
+     * @return
+     */
+    @Override
+    public String encode(SinkData sinkData) {
+        if (fields == null || fields.size() == 0) {
+            return "";
+        }
+        builder.delete(0, builder.length());
+        for (FieldInfo field : fields) {
+            String fieldName = field.getName();
+            String fieldValue = sinkData.getField(fieldName);
+            
builder.append(fieldName).append('=').append(fieldValue).append('&');
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
new file mode 100644
index 0000000000..037df2dfcf
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.Set;
+
+/**
+ * SinkData
+ * 
+ */
+public interface SinkData {
+
+    void putField(String fieldName, String fieldValue);
+
+    String getField(String fieldName);
+
+    Set<String> keySet();
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
new file mode 100644
index 0000000000..ab83d21a0b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+/**
+ * SinkEncoder
+ */
+public interface SinkEncoder {
+
+    String encode(SinkData sinkData);
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
new file mode 100644
index 0000000000..88dd5bf36c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * CsvSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CsvSinkInfo extends SinkInfo {
+
+    private String delimiter;
+    private String escapeChar;
+    private List<FieldInfo> fields;
+
+    @JsonCreator
+    public CsvSinkInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("delimiter") String delimiter,
+            @JsonProperty("escapeChar") String escapeChar,
+            @JsonProperty("fields") List<FieldInfo> fields) {
+        super(SourceInfo.CSV, charset);
+        this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
+        this.fields = fields;
+    }
+
+    /**
+     * get delimiter
+     * @return the delimiter
+     */
+    @JsonProperty("delimiter")
+    public String getDelimiter() {
+        return delimiter;
+    }
+
+    /**
+     * set delimiter
+     * @param delimiter the delimiter to set
+     */
+    public void setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    /**
+     * get escapeChar
+     * @return the escapeChar
+     */
+    @JsonProperty("escapeChar")
+    public String getEscapeChar() {
+        return escapeChar;
+    }
+
+    /**
+     * set escapeChar
+     * @param escapeChar the escapeChar to set
+     */
+    public void setEscapeChar(String escapeChar) {
+        this.escapeChar = escapeChar;
+    }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    @JsonProperty("fields")
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
+
+    /**
+     * set fields
+     * @param fields the fields to set
+     */
+    public void setFields(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
new file mode 100644
index 0000000000..27a46bcb19
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * CsvSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CsvSourceInfo extends SourceInfo {
+
+    private String delimiter;
+    private String escapeChar;
+    private List<FieldInfo> fields;
+
+    @JsonCreator
+    public CsvSourceInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("delimiter") String delimiter,
+            @JsonProperty("escapeChar") String escapeChar,
+            @JsonProperty("fields") List<FieldInfo> fields) {
+        super(charset);
+        this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
+        this.fields = fields;
+    }
+
+    /**
+     * get delimiter
+     * @return the delimiter
+     */
+    @JsonProperty("delimiter")
+    public String getDelimiter() {
+        return delimiter;
+    }
+
+    /**
+     * set delimiter
+     * @param delimiter the delimiter to set
+     */
+    public void setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    /**
+     * get escapeChar
+     * @return the escapeChar
+     */
+    @JsonProperty("escapeChar")
+    public String getEscapeChar() {
+        return escapeChar;
+    }
+
+    /**
+     * set escapeChar
+     * @param escapeChar the escapeChar to set
+     */
+    public void setEscapeChar(String escapeChar) {
+        this.escapeChar = escapeChar;
+    }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    @JsonProperty("fields")
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
+
+    /**
+     * set fields
+     * @param fields the fields to set
+     */
+    public void setFields(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
new file mode 100644
index 0000000000..46106e534f
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import lombok.Data;
+
+/**
+ * FieldInfo
+ */
+@Data
+public class FieldInfo {
+
+    private String name;
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
new file mode 100644
index 0000000000..9935623d06
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * JsonSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JsonSourceInfo extends SourceInfo {
+
+    private String rowsNodePath;
+
+    @JsonCreator
+    public JsonSourceInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("rowsNodePath") String rowsNodePath) {
+        super(charset);
+        this.rowsNodePath = rowsNodePath;
+    }
+
+    /**
+     * get rowsNodePath
+     * @return the rowsNodePath
+     */
+    @JsonProperty("rowsNodePath")
+    public String getRowsNodePath() {
+        return rowsNodePath;
+    }
+
+    /**
+     * set rowsNodePath
+     * @param rowsNodePath the rowsNodePath to set
+     */
+    public void setRowsNodePath(String rowsNodePath) {
+        this.rowsNodePath = rowsNodePath;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
new file mode 100644
index 0000000000..11c3550f42
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * KvSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KvSinkInfo extends SinkInfo {
+
+    private List<FieldInfo> fields;
+
+    @JsonCreator
+    public KvSinkInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("fields") List<FieldInfo> fields) {
+        super(SourceInfo.KV, charset);
+        this.fields = fields;
+    }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    @JsonProperty("fields")
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
+
+    /**
+     * set fields
+     * @param fields the fields to set
+     */
+    public void setFields(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
new file mode 100644
index 0000000000..6d92d44920
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * KvSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KvSourceInfo extends SourceInfo {
+
+    private List<FieldInfo> fields;
+
+    @JsonCreator
+    public KvSourceInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("fields") List<FieldInfo> fields) {
+        super(charset);
+        this.fields = fields;
+    }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    @JsonProperty("fields")
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
+
+    /**
+     * set fields
+     * @param fields the fields to set
+     */
+    public void setFields(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
new file mode 100644
index 0000000000..ba226bd807
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * PbSourceInfo
+ * 
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PbSourceInfo extends SourceInfo {
+
+    private String protoDefine;
+    private String rowsNodePath;
+
+    @JsonCreator
+    public PbSourceInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("protoDefine") String protoDefine,
+            @JsonProperty("rowsNodePath") String rowsNodePath) {
+        super(charset);
+        this.protoDefine = protoDefine;
+        this.rowsNodePath = rowsNodePath;
+    }
+
+    /**
+     * get protoDefine
+     * @return the protoDefine
+     */
+    @JsonProperty("protoDefine")
+    public String getProtoDefine() {
+        return protoDefine;
+    }
+
+    /**
+     * set protoDefine
+     * @param protoDefine the protoDefine to set
+     */
+    public void setProtoDefine(String protoDefine) {
+        this.protoDefine = protoDefine;
+    }
+
+    /**
+     * get rowsNodePath
+     * @return the rowsNodePath
+     */
+    @JsonProperty("rowsNodePath")
+    public String getRowsNodePath() {
+        return rowsNodePath;
+    }
+
+    /**
+     * set rowsNodePath
+     * @param rowsNodePath the rowsNodePath to set
+     */
+    public void setRowsNodePath(String rowsNodePath) {
+        this.rowsNodePath = rowsNodePath;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
new file mode 100644
index 0000000000..13e9e2a179
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import java.util.Locale;
+
+/**
+ * ProtocolType
+ */
+public enum ProtocolType {
+
+    CSV("csv"), KV("kv"), PB("pb"), JSON("json"), UNKNOWN("n");
+
+    private final String type;
+
+    ProtocolType(String type) {
+        this.type = type;
+    }
+
+    public static ProtocolType forType(String type) {
+        for (ProtocolType dataType : values()) {
+            if (dataType.getType().equals(type.toLowerCase(Locale.ROOT))) {
+                return dataType;
+            }
+        }
+        throw new IllegalArgumentException("Unsupported protocol type for " + 
type);
+    }
+
+    public static ProtocolType convert(String value) {
+        for (ProtocolType v : values()) {
+            if (v.getType().equals(value.toLowerCase(Locale.ROOT))) {
+                return v;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    @Override
+    public String toString() {
+        return this.name() + ":" + this.type;
+    }
+
+    public String getType() {
+        return type;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
new file mode 100644
index 0000000000..9bd4647479
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * SinkInfo
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+@JsonSubTypes({
+        @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
+        @Type(value = KvSinkInfo.class, name = SourceInfo.KV),
+})
+public abstract class SinkInfo {
+
+    @JsonIgnore
+    private String type;
+
+    @JsonProperty("charset")
+    private String charset;
+
+    public SinkInfo(
+            String type,
+            @JsonProperty("charset") String charset) {
+        this.type = checkNotNull(type);
+        this.charset = Optional.ofNullable(charset).orElse("UTF-8");
+    }
+
+    /**
+     * get type
+     * @return the type
+     */
+    @JsonIgnore
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * set type
+     * @param type the type to set
+     */
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * get charset
+     * @return the charset
+     */
+    @JsonProperty("charset")
+    public String getCharset() {
+        return charset;
+    }
+
+    /**
+     * set charset
+     * @param charset the charset to set
+     */
+    public void setCharset(String charset) {
+        this.charset = charset;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
new file mode 100644
index 0000000000..a02b17f78a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Optional;
+
+/**
+ * SourceInfo
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
+@JsonSubTypes({
+        @Type(value = CsvSourceInfo.class, name = SourceInfo.CSV),
+        @Type(value = KvSourceInfo.class, name = SourceInfo.KV),
+        @Type(value = PbSourceInfo.class, name = SourceInfo.PB),
+        @Type(value = JsonSourceInfo.class, name = SourceInfo.JSON),
+})
+public abstract class SourceInfo {
+
+    public static final String NODE_PATH_SEPARTOR = ".";
+
+    public static final String CSV = "csv";
+    public static final String KV = "kv";
+    public static final String PB = "pb";
+    public static final String JSON = "json";
+
+    private String charset;
+
+    public SourceInfo(
+            @JsonProperty("charset") String charset) {
+        this.charset = Optional.ofNullable(charset).orElse("UTF-8");
+    }
+
+    /**
+     * get charset
+     * @return the charset
+     */
+    public String getCharset() {
+        return charset;
+    }
+
+    /**
+     * set charset
+     * @param charset the charset to set
+     */
+    public void setCharset(String charset) {
+        this.charset = charset;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
new file mode 100644
index 0000000000..ff1ac958fc
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * TransformConfig
+ */
+public class TransformConfig {
+
+    @JsonProperty("sourceInfo")
+    private SourceInfo sourceInfo;
+    @JsonProperty("sinkInfo")
+    private SinkInfo sinkInfo;
+    @JsonProperty("transformSql")
+    private String transformSql;
+
+    @JsonCreator
+    public TransformConfig(
+            @JsonProperty("sourceInfo") SourceInfo sourceInfo,
+            @JsonProperty("sinkInfo") SinkInfo sinkInfo,
+            @JsonProperty("transformSql") String transformSql) {
+        this.sourceInfo = sourceInfo;
+        this.sinkInfo = sinkInfo;
+        this.transformSql = transformSql;
+    }
+
+    /**
+     * get sourceInfo
+     * @return the sourceInfo
+     */
+    @JsonProperty("sourceInfo")
+    public SourceInfo getSourceInfo() {
+        return sourceInfo;
+    }
+
+    /**
+     * set sourceInfo
+     * @param sourceInfo the sourceInfo to set
+     */
+    public void setSourceInfo(SourceInfo sourceInfo) {
+        this.sourceInfo = sourceInfo;
+    }
+
+    /**
+     * get sinkInfo
+     * @return the sinkInfo
+     */
+    @JsonProperty("sinkInfo")
+    public SinkInfo getSinkInfo() {
+        return sinkInfo;
+    }
+
+    /**
+     * set sinkInfo
+     * @param sinkInfo the sinkInfo to set
+     */
+    public void setSinkInfo(SinkInfo sinkInfo) {
+        this.sinkInfo = sinkInfo;
+    }
+
+    /**
+     * get transformSql
+     * @return the transformSql
+     */
+    @JsonProperty("transformSql")
+    public String getTransformSql() {
+        return transformSql;
+    }
+
+    /**
+     * set transformSql
+     * @param transformSql the transformSql to set
+     */
+    public void setTransformSql(String transformSql) {
+        this.transformSql = transformSql;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
new file mode 100644
index 0000000000..c08a6b3bc5
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.DefaultSinkData;
+import org.apache.inlong.sdk.transform.encode.KvSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkData;
+import org.apache.inlong.sdk.transform.encode.SinkEncoder;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.SinkInfo;
+import org.apache.inlong.sdk.transform.pojo.SourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import net.sf.jsqlparser.statement.select.SelectExpressionItem;
+import net.sf.jsqlparser.statement.select.SelectItem;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * TransformProcessor
+ * 
+ */
+public class TransformProcessor {
+
+    private TransformConfig config;
+    private SourceDecoder decoder;
+    private SinkEncoder encoder;
+    private Charset srcCharset = Charset.defaultCharset();
+    protected Charset sinkCharset = Charset.defaultCharset();
+
+    private PlainSelect transformSelect;
+    private ExpressionOperator where;
+    private Map<String, ValueParser> selectItemMap;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public TransformProcessor(String configString)
+            throws JsonMappingException, JsonProcessingException, 
JSQLParserException {
+        TransformConfig config = this.objectMapper.readValue(configString, 
TransformConfig.class);
+        this.init(config);
+    }
+
+    public TransformProcessor(TransformConfig config) throws 
JSQLParserException {
+        this.init(config);
+    }
+
+    private void init(TransformConfig config) throws JSQLParserException {
+        this.config = config;
+        if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) {
+            this.srcCharset = 
Charset.forName(config.getSourceInfo().getCharset());
+        }
+        if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) {
+            this.sinkCharset = 
Charset.forName(config.getSinkInfo().getCharset());
+        }
+        this.initDecoder(config);
+        this.initEncoder(config);
+        this.initTransformSql();
+    }
+
+    private void initDecoder(TransformConfig config) {
+        SourceInfo sourceInfo = config.getSourceInfo();
+        if (sourceInfo instanceof CsvSourceInfo) {
+            this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
+        } else if (sourceInfo instanceof KvSourceInfo) {
+            this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
+        }
+    }
+
+    private void initEncoder(TransformConfig config) {
+        SinkInfo sinkInfo = config.getSinkInfo();
+        if (sinkInfo instanceof CsvSinkInfo) {
+            this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo);
+        } else if (sinkInfo instanceof KvSinkInfo) {
+            this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo);
+        }
+    }
+
+    private void initTransformSql() throws JSQLParserException {
+        CCJSqlParserManager parserManager = new CCJSqlParserManager();
+        Select select = (Select) parserManager.parse(new 
StringReader(config.getTransformSql()));
+        this.transformSelect = (PlainSelect) select.getSelectBody();
+        this.where = 
OperatorTools.buildOperator(this.transformSelect.getWhere());
+        List<SelectItem> items = this.transformSelect.getSelectItems();
+        this.selectItemMap = new HashMap<>(items.size());
+        for (SelectItem item : items) {
+            if (item instanceof SelectExpressionItem) {
+                SelectExpressionItem exprItem = (SelectExpressionItem) item;
+                if (exprItem.getAlias() == null) {
+                    this.selectItemMap.put(exprItem.toString(),
+                            
OperatorTools.buildParser(exprItem.getExpression()));
+                } else {
+                    this.selectItemMap.put(exprItem.getAlias().getName(),
+                            
OperatorTools.buildParser(exprItem.getExpression()));
+                }
+            }
+        }
+    }
+
+    public List<String> transform(byte[] srcBytes, Map<String, Object> 
extParams) {
+        SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+        List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+        for (int i = 1; i <= sourceData.getRowCount(); i++) {
+            if (this.where != null && !this.where.check(sourceData, i)) {
+                continue;
+            }
+            SinkData sinkData = new DefaultSinkData();
+            for (Entry<String, ValueParser> entry : 
this.selectItemMap.entrySet()) {
+                String fieldName = entry.getKey();
+                Object fieldValue = entry.getValue().parse(sourceData, i);
+                sinkData.putField(fieldName, String.valueOf(fieldValue));
+            }
+            sinkDatas.add(this.encoder.encode(sinkData));
+        }
+        return sinkDatas;
+    }
+
+    public List<String> transform(String srcString, Map<String, Object> 
extParams) {
+        return this.transform(srcString.getBytes(this.srcCharset), extParams);
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
new file mode 100644
index 0000000000..c6464f850d
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+
+/**
+ * AndOperator
+ * 
+ */
+public class AndOperator implements ExpressionOperator {
+
+    private ExpressionOperator left;
+    private ExpressionOperator right;
+
+    public AndOperator(AndExpression expr) {
+        this.left = OperatorTools.buildOperator(expr.getLeftExpression());
+        this.right = OperatorTools.buildOperator(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return left.check(sourceData, rowIndex) && right.check(sourceData, 
rowIndex);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
new file mode 100644
index 0000000000..3172626000
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * EqualsToOperator
+ * 
+ */
+public class EqualsToOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public EqualsToOperator(EqualsTo expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
new file mode 100644
index 0000000000..b055e841e2
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+/**
+ * ExpressionOperator
+ */
+public interface ExpressionOperator {
+
+    boolean check(SourceData sourceData, int rowIndex);
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
new file mode 100644
index 0000000000..07da9d79c2
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * GreaterThanEqualsOperator
+ * 
+ */
+public class GreaterThanEqualsOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public GreaterThanEqualsOperator(GreaterThanEquals expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
new file mode 100644
index 0000000000..3b2158d96b
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * GreaterThanOperator
+ * 
+ */
+public class GreaterThanOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public GreaterThanOperator(GreaterThan expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) > 0;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
new file mode 100644
index 0000000000..fec4ed8019
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * MinorThanEqualsOperator
+ * 
+ */
+public class MinorThanEqualsOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public MinorThanEqualsOperator(MinorThanEquals expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
new file mode 100644
index 0000000000..5d9db7dd9c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * MinorThanOperator
+ * 
+ */
+public class MinorThanOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public MinorThanOperator(MinorThan expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return ObjectUtils.compare((Comparable) this.left.parse(sourceData, 
rowIndex),
+                (Comparable) this.right.parse(sourceData, rowIndex)) < 0;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
new file mode 100644
index 0000000000..9c58e70476
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * NotEqualsToOperator
+ * 
+ */
+public class NotEqualsToOperator implements ExpressionOperator {
+
+    private ValueParser left;
+    private ValueParser right;
+
+    public NotEqualsToOperator(NotEqualsTo expr) {
+        this.left = OperatorTools.buildParser(expr.getLeftExpression());
+        this.right = OperatorTools.buildParser(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), 
this.right.parse(sourceData, rowIndex));
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
new file mode 100644
index 0000000000..f648d426e7
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.NotExpression;
+
+/**
+ * NotOperator
+ * 
+ */
+public class NotOperator implements ExpressionOperator {
+
+    private ExpressionOperator node;
+
+    public NotOperator(NotExpression expr) {
+        this.node = OperatorTools.buildOperator(expr.getExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return !this.node.check(sourceData, rowIndex);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
new file mode 100644
index 0000000000..c0e059f266
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.LongParser;
+import org.apache.inlong.sdk.transform.process.parser.StringParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NotExpression;
+import net.sf.jsqlparser.expression.Parenthesis;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import net.sf.jsqlparser.schema.Column;
+
+/**
+ * OperatorTools
+ * 
+ */
+public class OperatorTools {
+
+    public static ExpressionOperator buildOperator(Expression expr) {
+        if (expr instanceof AndExpression) {
+            return new AndOperator((AndExpression) expr);
+        } else if (expr instanceof OrExpression) {
+            return new OrOperator((OrExpression) expr);
+        } else if (expr instanceof Parenthesis) {
+            return new ParenthesisOperator((Parenthesis) expr);
+        } else if (expr instanceof NotExpression) {
+            return new NotOperator((NotExpression) expr);
+        } else if (expr instanceof EqualsTo) {
+            return new EqualsToOperator((EqualsTo) expr);
+        } else if (expr instanceof NotEqualsTo) {
+            return new NotEqualsToOperator((NotEqualsTo) expr);
+        } else if (expr instanceof GreaterThan) {
+            return new GreaterThanOperator((GreaterThan) expr);
+        } else if (expr instanceof GreaterThanEquals) {
+            return new GreaterThanEqualsOperator((GreaterThanEquals) expr);
+        } else if (expr instanceof MinorThan) {
+            return new MinorThanOperator((MinorThan) expr);
+        } else if (expr instanceof MinorThanEquals) {
+            return new MinorThanEqualsOperator((MinorThanEquals) expr);
+        }
+        return null;
+    }
+
+    public static ValueParser buildParser(Expression expr) {
+        if (expr instanceof Column) {
+            return new ColumnParser((Column) expr);
+        } else if (expr instanceof StringValue) {
+            return new StringParser((StringValue) expr);
+        } else if (expr instanceof LongValue) {
+            return new LongParser((LongValue) expr);
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
new file mode 100644
index 0000000000..33b9f82bdc
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+
+/**
+ * OrOperator
+ * 
+ */
+public class OrOperator implements ExpressionOperator {
+
+    private ExpressionOperator left;
+    private ExpressionOperator right;
+
+    public OrOperator(OrExpression expr) {
+        this.left = OperatorTools.buildOperator(expr.getLeftExpression());
+        this.right = OperatorTools.buildOperator(expr.getRightExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return left.check(sourceData, rowIndex) || right.check(sourceData, 
rowIndex);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
new file mode 100644
index 0000000000..111f6bbb21
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.Parenthesis;
+
+/**
+ * ParenthesisOperator
+ * 
+ */
+public class ParenthesisOperator implements ExpressionOperator {
+
+    private ExpressionOperator node;
+
+    public ParenthesisOperator(Parenthesis expr) {
+        this.node = OperatorTools.buildOperator(expr.getExpression());
+    }
+
+    /**
+     * check
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public boolean check(SourceData sourceData, int rowIndex) {
+        return this.node.check(sourceData, rowIndex);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
new file mode 100644
index 0000000000..b6dd579d2a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.schema.Column;
+
+/**
+ * ColumnParser
+ * 
+ */
+public class ColumnParser implements ValueParser {
+
+    private String fieldName;
+
+    public ColumnParser(Column expr) {
+        this.fieldName = expr.getColumnName();
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex) {
+        return sourceData.getField(rowIndex, fieldName);
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
new file mode 100644
index 0000000000..efd61cc2cb
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.LongValue;
+
+/**
+ * LongParser
+ * 
+ */
+public class LongParser implements ValueParser {
+
+    private Long value;
+
+    public LongParser(LongValue expr) {
+        this.value = expr.getValue();
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex) {
+        return value;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
new file mode 100644
index 0000000000..9cb431c1fa
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.StringValue;
+
+/**
+ * StringParser
+ * 
+ */
+public class StringParser implements ValueParser {
+
+    private String stringValue;
+
+    public StringParser(StringValue expr) {
+        this.stringValue = expr.getValue();
+    }
+
+    /**
+     * parse
+     * @param sourceData
+     * @param rowIndex
+     * @return
+     */
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex) {
+        return stringValue;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
new file mode 100644
index 0000000000..bafafe276c
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+/**
+ * ValueParser
+ * 
+ */
+public interface ValueParser {
+
+    Object parse(SourceData sourceData, int rowIndex);
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
new file mode 100644
index 0000000000..3e36bfda26
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TestTransformConfig
+ * 
+ */
+public class TestTransformConfig {
+
+    @Test
+    public void testCsv() {
+        try {
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            List<FieldInfo> fields = new ArrayList<>();
+            fields.add(ftime);
+            SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", 
fields);
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select ftime from source";
+            TransformConfig config = new TransformConfig(csvSource, csvSink, 
transformSql);
+            ObjectMapper objectMapper = new ObjectMapper();
+            String configString = objectMapper.writeValueAsString(config);
+            System.out.println(configString);
+            Assert.assertEquals(configString, 
"{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
+                    + 
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
+                    + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
+                    + 
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
+                    + "\"transformSql\":\"select ftime from source\"}");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testKv() {
+        try {
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            List<FieldInfo> fields = new ArrayList<>();
+            fields.add(ftime);
+            SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+            SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+            String transformSql = "select ftime from source";
+            TransformConfig config = new TransformConfig(kvSource, kvSink, 
transformSql);
+            ObjectMapper objectMapper = new ObjectMapper();
+            String configString = objectMapper.writeValueAsString(config);
+            System.out.println(configString);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testPb() {
+        try {
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            List<FieldInfo> fields = new ArrayList<>();
+            fields.add(ftime);
+            SourceInfo pbSource = new PbSourceInfo("UTF-8", "syntax = 
\"proto3\";", "root");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select ftime from source";
+            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+            ObjectMapper objectMapper = new ObjectMapper();
+            String configString = objectMapper.writeValueAsString(config);
+            System.out.println(configString);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testJson() {
+        try {
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            List<FieldInfo> fields = new ArrayList<>();
+            fields.add(ftime);
+            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root");
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select ftime from source";
+            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+            ObjectMapper objectMapper = new ObjectMapper();
+            String configString = objectMapper.writeValueAsString(config);
+            System.out.println(configString);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
new file mode 100644
index 0000000000..9c75456df6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.SinkInfo;
+import org.apache.inlong.sdk.transform.pojo.SourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * TestTransformProcessor
+ * 
+ */
+public class TestTransformProcessor {
+
+    @Test
+    public void testCsv2Kv() {
+        try {
+            List<FieldInfo> fields = new ArrayList<>();
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            fields.add(ftime);
+            FieldInfo extinfo = new FieldInfo();
+            extinfo.setName("extinfo");
+            fields.add(extinfo);
+            SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", 
fields);
+            SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+            String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+            TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+            // case1
+            TransformProcessor processor1 = new TransformProcessor(config);
+            List<String> output1 = processor1.transform("2024-04-28 
00:00:00|ok", new HashMap<>());
+            Assert.assertTrue(output1.size() == 1);
+            Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
+            // case2
+            config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
+            TransformProcessor processor2 = new TransformProcessor(config);
+            List<String> output2 = processor2.transform("2024-04-28 
00:00:00|ok", new HashMap<>());
+            Assert.assertTrue(output2.size() == 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testKvCsv() {
+        try {
+            List<FieldInfo> fields = new ArrayList<>();
+            FieldInfo ftime = new FieldInfo();
+            ftime.setName("ftime");
+            fields.add(ftime);
+            FieldInfo extinfo = new FieldInfo();
+            extinfo.setName("extinfo");
+            fields.add(extinfo);
+            SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+            String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+            TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
+            // case1
+            TransformProcessor processor1 = new TransformProcessor(config);
+            List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+            Assert.assertTrue(output1.size() == 1);
+            Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+            // case2
+            config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
+            TransformProcessor processor2 = new TransformProcessor(config);
+            List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+            Assert.assertTrue(output2.size() == 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testKvCsvByJsonConfig() {
+        try {
+            String configString1 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                    + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+                    + "\"escapeChar\":\"\\\\\","
+                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                    + "\"transformSql\":\"select ftime,extinfo from source 
where extinfo='ok'\"}";
+            // case1
+            TransformProcessor processor1 = new 
TransformProcessor(configString1);
+            List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+            Assert.assertTrue(output1.size() == 1);
+            Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+            // case2
+            String configString2 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                    + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+                    + "\"escapeChar\":\"\\\\\","
+                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                    + "\"transformSql\":\"select ftime,extinfo from source 
where extinfo!='ok'\"}";
+            TransformProcessor processor2 = new 
TransformProcessor(configString2);
+            List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+            Assert.assertTrue(output2.size() == 0);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

Reply via email to