This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e40550c30a [INLONG-10154][SDK] Support to transform CSV/KV data to 
CSV/KV data without field list configuration (#10155)
e40550c30a is described below

commit e40550c30a27939b55894b1b59b223b5e28842dd
Author: 卢春亮 <luchunli...@apache.org>
AuthorDate: Wed May 8 21:31:54 2024 +0800

    [INLONG-10154][SDK] Support to transform CSV/KV data to CSV/KV data without 
field list configuration (#10155)
---
 .../sdk/transform/decode/CsvSourceDecoder.java     |   2 +-
 .../sdk/transform/encode/CsvSinkEncoder.java       |  29 +-
 .../sdk/transform/encode/DefaultSinkData.java      |  15 +-
 .../inlong/sdk/transform/encode/KvSinkEncoder.java |  18 +-
 .../inlong/sdk/transform/encode/SinkData.java      |   6 +-
 .../inlong/sdk/transform/pojo/CsvSinkInfo.java     |   7 +-
 .../inlong/sdk/transform/pojo/CsvSourceInfo.java   |   7 +-
 .../inlong/sdk/transform/pojo/KvSinkInfo.java      |   7 +-
 .../inlong/sdk/transform/pojo/KvSourceInfo.java    |   7 +-
 .../sdk/transform/process/TransformProcessor.java  |   4 +-
 .../transform/process/operator/OperatorTools.java  |   6 +
 .../transform/process/TestTransformProcessor.java  | 434 ++++++++++-----------
 12 files changed, 289 insertions(+), 253 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 22e3b9ba13..14a505d422 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -68,7 +68,7 @@ public class CsvSourceDecoder implements SourceDecoder {
             if (fields == null || fields.size() == 0) {
                 for (int j = 0; j < fieldValues.length; j++) {
                     String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 
1);
-                    sourceData.putField(fieldName, fieldValues[i]);
+                    sourceData.putField(fieldName, fieldValues[j]);
                 }
                 continue;
             }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index a991d137b9..4b963c10fb 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -58,18 +58,27 @@ public class CsvSinkEncoder implements SinkEncoder {
      */
     @Override
     public String encode(SinkData sinkData) {
-        if (fields == null || fields.size() == 0) {
-            return "";
-        }
         builder.delete(0, builder.length());
-        if (escapeChar == null) {
-            fields.forEach(v -> 
builder.append(sinkData.getField(v.getName())).append(delimiter));
+        if (fields == null || fields.size() == 0) {
+            if (escapeChar == null) {
+                sinkData.keyList().forEach(k -> 
builder.append(sinkData.getField(k)).append(delimiter));
+            } else {
+                for (String fieldName : sinkData.keyList()) {
+                    String fieldValue = sinkData.getField(fieldName);
+                    EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
+                    builder.append(delimiter);
+                }
+            }
         } else {
-            for (FieldInfo field : fields) {
-                String fieldName = field.getName();
-                String fieldValue = sinkData.getField(fieldName);
-                EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
-                builder.append(delimiter);
+            if (escapeChar == null) {
+                fields.forEach(v -> 
builder.append(sinkData.getField(v.getName())).append(delimiter));
+            } else {
+                for (FieldInfo field : fields) {
+                    String fieldName = field.getName();
+                    String fieldValue = sinkData.getField(fieldName);
+                    EscapeUtils.escapeContent(builder, delimiter, escapeChar, 
fieldValue);
+                    builder.append(delimiter);
+                }
             }
         }
         return builder.substring(0, builder.length() - 1);
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
index 2e1c3bea4b..0ffefeea22 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * DefaultSinkData
@@ -27,15 +28,17 @@ import java.util.Set;
  */
 public class DefaultSinkData implements SinkData {
 
+    private List<String> keyList = new ArrayList<>();
     private Map<String, String> currentRow = new HashMap<>();
 
     /**
-     * putField
+     * addField
      * @param fieldName
      * @param fieldValue
      */
     @Override
-    public void putField(String fieldName, String fieldValue) {
+    public void addField(String fieldName, String fieldValue) {
+        this.keyList.add(fieldName);
         this.currentRow.put(fieldName, fieldValue);
     }
 
@@ -50,11 +53,11 @@ public class DefaultSinkData implements SinkData {
     }
 
     /**
-     * keySet
+     * keyList
      * @return
      */
     @Override
-    public Set<String> keySet() {
-        return this.currentRow.keySet();
+    public List<String> keyList() {
+        return this.keyList;
     }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 56f35ec319..503914e80a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -50,14 +50,18 @@ public class KvSinkEncoder implements SinkEncoder {
      */
     @Override
     public String encode(SinkData sinkData) {
-        if (fields == null || fields.size() == 0) {
-            return "";
-        }
         builder.delete(0, builder.length());
-        for (FieldInfo field : fields) {
-            String fieldName = field.getName();
-            String fieldValue = sinkData.getField(fieldName);
-            
builder.append(fieldName).append('=').append(fieldValue).append('&');
+        if (fields == null || fields.size() == 0) {
+            for (String fieldName : sinkData.keyList()) {
+                String fieldValue = sinkData.getField(fieldName);
+                
builder.append(fieldName).append('=').append(fieldValue).append('&');
+            }
+        } else {
+            for (FieldInfo field : fields) {
+                String fieldName = field.getName();
+                String fieldValue = sinkData.getField(fieldName);
+                
builder.append(fieldName).append('=').append(fieldValue).append('&');
+            }
         }
         return builder.substring(0, builder.length() - 1);
     }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
index 037df2dfcf..1ad0c38c68 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
-import java.util.Set;
+import java.util.List;
 
 /**
  * SinkData
@@ -25,9 +25,9 @@ import java.util.Set;
  */
 public interface SinkData {
 
-    void putField(String fieldName, String fieldValue);
+    void addField(String fieldName, String fieldValue);
 
     String getField(String fieldName);
 
-    Set<String> keySet();
+    List<String> keyList();
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
index 88dd5bf36c..555f6261f9 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -42,7 +43,11 @@ public class CsvSinkInfo extends SinkInfo {
         super(SourceInfo.CSV, charset);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
-        this.fields = fields;
+        if (fields != null) {
+            this.fields = fields;
+        } else {
+            this.fields = new ArrayList<>();
+        }
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
index 27a46bcb19..29d2bd8f2e 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -42,7 +43,11 @@ public class CsvSourceInfo extends SourceInfo {
         super(charset);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
-        this.fields = fields;
+        if (fields != null) {
+            this.fields = fields;
+        } else {
+            this.fields = new ArrayList<>();
+        }
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
index 11c3550f42..8a63e0c66c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -36,7 +37,11 @@ public class KvSinkInfo extends SinkInfo {
             @JsonProperty("charset") String charset,
             @JsonProperty("fields") List<FieldInfo> fields) {
         super(SourceInfo.KV, charset);
-        this.fields = fields;
+        if (fields != null) {
+            this.fields = fields;
+        } else {
+            this.fields = new ArrayList<>();
+        }
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
index 6d92d44920..e627367548 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -36,7 +37,11 @@ public class KvSourceInfo extends SourceInfo {
             @JsonProperty("charset") String charset,
             @JsonProperty("fields") List<FieldInfo> fields) {
         super(charset);
-        this.fields = fields;
+        if (fields != null) {
+            this.fields = fields;
+        } else {
+            this.fields = new ArrayList<>();
+        }
     }
 
     /**
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index c979ef71c4..c3ee9270cc 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -172,10 +172,10 @@ public class TransformProcessor {
                 String fieldName = entry.getKey();
                 try {
                     Object fieldValue = entry.getValue().parse(sourceData, i);
-                    sinkData.putField(fieldName, String.valueOf(fieldValue));
+                    sinkData.addField(fieldName, String.valueOf(fieldValue));
                 } catch (Throwable t) {
                     LOG.error(t.getMessage(), t);
-                    sinkData.putField(fieldName, "");
+                    sinkData.addField(fieldName, "");
                 }
             }
             sinkDatas.add(this.encoder.encode(sinkData));
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 7f73e11d06..14e7c20d4b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -144,6 +144,12 @@ public class OperatorTools {
      */
     @SuppressWarnings("rawtypes")
     public static int compareValue(Comparable left, Comparable right) {
+        if (left == null) {
+            return right == null ? 0 : -1;
+        }
+        if (right == null) {
+            return 1;
+        }
         if (left instanceof String) {
             if (right instanceof String) {
                 return ObjectUtils.compare(left, right);
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 7e46e28105..b76dde6f7f 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -43,165 +43,176 @@ import java.util.List;
 public class TestTransformProcessor {
 
     @Test
-    public void testCsv2Kv() {
-        try {
-            List<FieldInfo> fields = new ArrayList<>();
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            fields.add(ftime);
-            FieldInfo extinfo = new FieldInfo();
-            extinfo.setName("extinfo");
-            fields.add(extinfo);
-            SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", 
fields);
-            SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
-            String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
-            TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
-            // case1
-            TransformProcessor processor1 = new TransformProcessor(config);
-            List<String> output1 = processor1.transform("2024-04-28 
00:00:00|ok", new HashMap<>());
-            Assert.assertTrue(output1.size() == 1);
-            Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
-            // case2
-            config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
-            TransformProcessor processor2 = new TransformProcessor(config);
-            List<String> output2 = processor2.transform("2024-04-28 
00:00:00|ok", new HashMap<>());
-            Assert.assertTrue(output2.size() == 0);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testCsv2Kv() throws Exception {
+        List<FieldInfo> fields = new ArrayList<>();
+        FieldInfo ftime = new FieldInfo();
+        ftime.setName("ftime");
+        fields.add(ftime);
+        FieldInfo extinfo = new FieldInfo();
+        extinfo.setName("extinfo");
+        fields.add(extinfo);
+        SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields);
+        SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+        String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        // case1
+        TransformProcessor processor1 = new TransformProcessor(config);
+        List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
+        // case2
+        config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
+        TransformProcessor processor2 = new TransformProcessor(config);
+        List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertTrue(output2.size() == 0);
+    }
+
+    @Test
+    public void testCsv2KvNoField() throws Exception {
+        SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
+        SinkInfo kvSink = new KvSinkInfo("UTF-8", null);
+        String transformSql = "select $1 ftime,$2 extinfo from source where 
$2='ok'";
+        TransformConfig config = new TransformConfig(csvSource, kvSink, 
transformSql);
+        // case1
+        TransformProcessor processor1 = new TransformProcessor(config);
+        List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(output1.get(0), "ftime=2024-04-28 
00:00:00&extinfo=ok");
+        // case2
+        config.setTransformSql("select $1 ftime,$2 extinfo from source where 
$2!='ok'");
+        TransformProcessor processor2 = new TransformProcessor(config);
+        List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", 
new HashMap<>());
+        Assert.assertTrue(output2.size() == 0);
+    }
+
+    @Test
+    public void testKv2Csv() throws Exception {
+        List<FieldInfo> fields = new ArrayList<>();
+        FieldInfo ftime = new FieldInfo();
+        ftime.setName("ftime");
+        fields.add(ftime);
+        FieldInfo extinfo = new FieldInfo();
+        extinfo.setName("extinfo");
+        fields.add(extinfo);
+        SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+        TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor1 = new TransformProcessor(config);
+        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+        // case2
+        config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
+        TransformProcessor processor2 = new TransformProcessor(config);
+        List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output2.size() == 0);
     }
 
     @Test
-    public void testKv2Csv() {
-        try {
-            List<FieldInfo> fields = new ArrayList<>();
-            FieldInfo ftime = new FieldInfo();
-            ftime.setName("ftime");
-            fields.add(ftime);
-            FieldInfo extinfo = new FieldInfo();
-            extinfo.setName("extinfo");
-            fields.add(extinfo);
-            SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
-            TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor1 = new TransformProcessor(config);
-            List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-            Assert.assertTrue(output1.size() == 1);
-            Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
-            // case2
-            config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
-            TransformProcessor processor2 = new TransformProcessor(config);
-            List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-            Assert.assertTrue(output2.size() == 0);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testKv2CsvNoField() throws Exception {
+        SourceInfo kvSource = new KvSourceInfo("UTF-8", null);
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null);
+        String transformSql = "select ftime,extinfo from source where 
extinfo='ok'";
+        TransformConfig config = new TransformConfig(kvSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor1 = new TransformProcessor(config);
+        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+        // case2
+        config.setTransformSql("select ftime,extinfo from source where 
extinfo!='ok'");
+        TransformProcessor processor2 = new TransformProcessor(config);
+        List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output2.size() == 0);
     }
 
     @Test
-    public void testJson2Csv() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
-            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            String srcString = "{\n"
-                    + "  \"sid\":\"value1\",\n"
-                    + "  \"packageID\":\"value2\",\n"
-                    + "  \"msgs\":[\n"
-                    + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
-                    + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
-                    + "  ]\n"
-                    + "}";
-            List<String> output = processor.transform(srcString, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 2);
-            Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
-            Assert.assertEquals(output.get(1), 
"value1|value2|1713243918000|v4");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testJson2Csv() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        String srcString = "{\n"
+                + "  \"sid\":\"value1\",\n"
+                + "  \"packageID\":\"value2\",\n"
+                + "  \"msgs\":[\n"
+                + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                + "  ]\n"
+                + "}";
+        List<String> output = processor.transform(srcString, new HashMap<>());
+        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
+        Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4");
     }
 
     @Test
-    public void testJson2CsvForOne() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql =
-                    "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
-            TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            String srcString = "{\n"
-                    + "  \"sid\":\"value1\",\n"
-                    + "  \"packageID\":\"value2\",\n"
-                    + "  \"msgs\":[\n"
-                    + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
-                    + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
-                    + "  ]\n"
-                    + "}";
-            List<String> output = processor.transform(srcString, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 1);
-            Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testJson2CsvForOne() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+        TransformConfig config = new TransformConfig(jsonSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        String srcString = "{\n"
+                + "  \"sid\":\"value1\",\n"
+                + "  \"packageID\":\"value2\",\n"
+                + "  \"msgs\":[\n"
+                + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                + "  ]\n"
+                + "}";
+        List<String> output = processor.transform(srcString, new HashMap<>());
+        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(output.get(0), 
"value1|value2|1713243918000|value4");
     }
 
     @Test
-    public void testKvCsvByJsonConfig() {
-        try {
-            String configString1 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
-                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                    + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
-                    + "\"escapeChar\":\"\\\\\","
-                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                    + "\"transformSql\":\"select ftime,extinfo from source 
where extinfo='ok'\"}";
-            // case1
-            TransformProcessor processor1 = new 
TransformProcessor(configString1);
-            List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-            Assert.assertTrue(output1.size() == 1);
-            Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
-            // case2
-            String configString2 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
-                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                    + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
-                    + "\"escapeChar\":\"\\\\\","
-                    + 
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
-                    + "\"transformSql\":\"select ftime,extinfo from source 
where extinfo!='ok'\"}";
-            TransformProcessor processor2 = new 
TransformProcessor(configString2);
-            List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
-            Assert.assertTrue(output2.size() == 0);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testKvCsvByJsonConfig() throws Exception {
+        String configString1 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+                + "\"escapeChar\":\"\\\\\","
+                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                + "\"transformSql\":\"select ftime,extinfo from source where 
extinfo='ok'\"}";
+        // case1
+        TransformProcessor processor1 = new TransformProcessor(configString1);
+        List<String> output1 = processor1.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output1.size() == 1);
+        Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+        // case2
+        String configString2 = 
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                + 
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+                + "\"escapeChar\":\"\\\\\","
+                + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+                + "\"transformSql\":\"select ftime,extinfo from source where 
extinfo!='ok'\"}";
+        TransformProcessor processor2 = new TransformProcessor(configString2);
+        List<String> output2 = processor2.transform("ftime=2024-04-28 
00:00:00&extinfo=ok", new HashMap<>());
+        Assert.assertTrue(output2.size() == 0);
     }
 
     @Test
-    public void testPb2Csv() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            String transformBase64 = this.getPbTestDescription();
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            byte[] srcBytes = this.getPbTestData();
-            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 2);
-            Assert.assertEquals(output.get(0), 
"sid|1|1713243918000|msgValue4");
-            Assert.assertEquals(output.get(1), 
"sid|1|1713243918002|msgValue42");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testPb2Csv() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        String transformBase64 = this.getPbTestDescription();
+        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        byte[] srcBytes = this.getPbTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
+        Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
     }
 
     private List<FieldInfo> getTestFieldList() {
@@ -250,90 +261,73 @@ public class TestTransformProcessor {
     }
 
     @Test
-    public void testPb2CsvForOne() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            String transformBase64 = this.getPbTestDescription();
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql =
-                    "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            byte[] srcBytes = this.getPbTestData();
-            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 1);
-            Assert.assertEquals(output.get(0), 
"sid|1|1713243918002|msgValue4");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testPb2CsvForOne() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        String transformBase64 = this.getPbTestDescription();
+        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        byte[] srcBytes = this.getPbTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
     }
 
     @Test
-    public void testPb2CsvForAdd() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            String transformBase64 = this.getPbTestDescription();
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select $root.sid,"
-                    + 
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
-                    + 
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
-                    + "*$root.packageID field3,"
-                    + "$root.msgs(0).msg field4 from source "
-                    + "where 
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
-                    + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            byte[] srcBytes = this.getPbTestData();
-            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 1);
-            Assert.assertEquals(output.get(0), 
"sid|2|3426487836002|msgValue4");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testPb2CsvForAdd() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        String transformBase64 = this.getPbTestDescription();
+        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", null);
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select $root.sid,"
+                + 
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
+                + 
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
+                + "*$root.packageID field3,"
+                + "$root.msgs(0).msg field4 from source "
+                + "where 
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+                + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
+        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        byte[] srcBytes = this.getPbTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 1);
+        Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4");
     }
 
     @Test
-    public void testPb2CsvForConcat() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            String transformBase64 = this.getPbTestDescription();
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,"
-                    + 
"concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            byte[] srcBytes = this.getPbTestData();
-            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 2);
-            Assert.assertEquals(output.get(0), 
"sid|1|1713243918000|sid11713243918000msgValue4");
-            Assert.assertEquals(output.get(1), 
"sid|1|1713243918002|sid11713243918002msgValue42");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testPb2CsvForConcat() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        String transformBase64 = this.getPbTestDescription();
+        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,"
+                + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) 
msg,$root.msgs.msgTime.msg from source";
+        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        byte[] srcBytes = this.getPbTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 2);
+        Assert.assertEquals(output.get(0), 
"sid|1|1713243918000|sid11713243918000msgValue4");
+        Assert.assertEquals(output.get(1), 
"sid|1|1713243918002|sid11713243918002msgValue42");
     }
 
     @Test
-    public void testPb2CsvForNow() {
-        try {
-            List<FieldInfo> fields = this.getTestFieldList();
-            String transformBase64 = this.getPbTestDescription();
-            SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
-            SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
-            String transformSql = "select now() from source";
-            TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
-            // case1
-            TransformProcessor processor = new TransformProcessor(config);
-            byte[] srcBytes = this.getPbTestData();
-            List<String> output = processor.transform(srcBytes, new 
HashMap<>());
-            Assert.assertTrue(output.size() == 2);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void testPb2CsvForNow() throws Exception {
+        List<FieldInfo> fields = this.getTestFieldList();
+        String transformBase64 = this.getPbTestDescription();
+        SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, 
"SdkDataRequest", "msgs");
+        SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+        String transformSql = "select now() from source";
+        TransformConfig config = new TransformConfig(pbSource, csvSink, 
transformSql);
+        // case1
+        TransformProcessor processor = new TransformProcessor(config);
+        byte[] srcBytes = this.getPbTestData();
+        List<String> output = processor.transform(srcBytes, new HashMap<>());
+        Assert.assertTrue(output.size() == 2);
     }
 }


Reply via email to