This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e40550c30a [INLONG-10154][SDK] Support to transform CSV/KV data to CSV/KV data without field list configuration (#10155) e40550c30a is described below commit e40550c30a27939b55894b1b59b223b5e28842dd Author: 卢春亮 <luchunli...@apache.org> AuthorDate: Wed May 8 21:31:54 2024 +0800 [INLONG-10154][SDK] Support to transform CSV/KV data to CSV/KV data without field list configuration (#10155) --- .../sdk/transform/decode/CsvSourceDecoder.java | 2 +- .../sdk/transform/encode/CsvSinkEncoder.java | 29 +- .../sdk/transform/encode/DefaultSinkData.java | 15 +- .../inlong/sdk/transform/encode/KvSinkEncoder.java | 18 +- .../inlong/sdk/transform/encode/SinkData.java | 6 +- .../inlong/sdk/transform/pojo/CsvSinkInfo.java | 7 +- .../inlong/sdk/transform/pojo/CsvSourceInfo.java | 7 +- .../inlong/sdk/transform/pojo/KvSinkInfo.java | 7 +- .../inlong/sdk/transform/pojo/KvSourceInfo.java | 7 +- .../sdk/transform/process/TransformProcessor.java | 4 +- .../transform/process/operator/OperatorTools.java | 6 + .../transform/process/TestTransformProcessor.java | 434 ++++++++++----------- 12 files changed, 289 insertions(+), 253 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java index 22e3b9ba13..14a505d422 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -68,7 +68,7 @@ public class CsvSourceDecoder implements SourceDecoder { if (fields == null || fields.size() == 0) { for (int j = 0; j < fieldValues.length; j++) { String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 1); - sourceData.putField(fieldName, fieldValues[i]); + sourceData.putField(fieldName, fieldValues[j]); } continue; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index a991d137b9..4b963c10fb 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -58,18 +58,27 @@ public class CsvSinkEncoder implements SinkEncoder { */ @Override public String encode(SinkData sinkData) { - if (fields == null || fields.size() == 0) { - return ""; - } builder.delete(0, builder.length()); - if (escapeChar == null) { - fields.forEach(v -> builder.append(sinkData.getField(v.getName())).append(delimiter)); + if (fields == null || fields.size() == 0) { + if (escapeChar == null) { + sinkData.keyList().forEach(k -> builder.append(sinkData.getField(k)).append(delimiter)); + } else { + for (String fieldName : sinkData.keyList()) { + String fieldValue = sinkData.getField(fieldName); + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + builder.append(delimiter); + } + } } else { - for (FieldInfo field : fields) { - String fieldName = field.getName(); - String fieldValue = sinkData.getField(fieldName); - EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); - builder.append(delimiter); + if (escapeChar == null) { + fields.forEach(v -> builder.append(sinkData.getField(v.getName())).append(delimiter)); + } else { + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = sinkData.getField(fieldName); + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + builder.append(delimiter); + } } } return builder.substring(0, builder.length() - 1); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java index 2e1c3bea4b..0ffefeea22 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java @@ -17,9 +17,10 @@ package org.apache.inlong.sdk.transform.encode; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; /** * DefaultSinkData @@ -27,15 +28,17 @@ import java.util.Set; */ public class DefaultSinkData implements SinkData { + private List<String> keyList = new ArrayList<>(); private Map<String, String> currentRow = new HashMap<>(); /** - * putField + * addField * @param fieldName * @param fieldValue */ @Override - public void putField(String fieldName, String fieldValue) { + public void addField(String fieldName, String fieldValue) { + this.keyList.add(fieldName); this.currentRow.put(fieldName, fieldValue); } @@ -50,11 +53,11 @@ public class DefaultSinkData implements SinkData { } /** - * keySet + * keyList * @return */ @Override - public Set<String> keySet() { - return this.currentRow.keySet(); + public List<String> keyList() { + return this.keyList; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 56f35ec319..503914e80a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -50,14 +50,18 @@ public class KvSinkEncoder implements SinkEncoder { */ @Override public String encode(SinkData sinkData) { - if (fields == null || fields.size() == 0) { - return ""; - } builder.delete(0, builder.length()); - for (FieldInfo field : fields) { - String fieldName = field.getName(); - String fieldValue = sinkData.getField(fieldName); - builder.append(fieldName).append('=').append(fieldValue).append('&'); + if (fields == null || fields.size() == 0) { + for (String fieldName : sinkData.keyList()) { + String fieldValue = sinkData.getField(fieldName); + builder.append(fieldName).append('=').append(fieldValue).append('&'); + } + } else { + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = sinkData.getField(fieldName); + builder.append(fieldName).append('=').append(fieldValue).append('&'); + } } return builder.substring(0, builder.length() - 1); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java index 037df2dfcf..1ad0c38c68 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java @@ -17,7 +17,7 @@ package org.apache.inlong.sdk.transform.encode; -import java.util.Set; +import java.util.List; /** * SinkData @@ -25,9 +25,9 @@ import java.util.Set; */ public interface SinkData { - void putField(String fieldName, String fieldValue); + void addField(String fieldName, String fieldValue); String getField(String fieldName); - Set<String> keySet(); + List<String> keyList(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java index 88dd5bf36c..555f6261f9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; /** @@ -42,7 +43,11 @@ public class CsvSinkInfo extends SinkInfo { super(SourceInfo.CSV, charset); this.delimiter = delimiter; this.escapeChar = escapeChar; - this.fields = fields; + if (fields != null) { + this.fields = fields; + } else { + this.fields = new ArrayList<>(); + } } /** diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java index 27a46bcb19..29d2bd8f2e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; /** @@ -42,7 +43,11 @@ public class CsvSourceInfo extends SourceInfo { super(charset); this.delimiter = delimiter; this.escapeChar = escapeChar; - this.fields = fields; + if (fields != null) { + this.fields = fields; + } else { + this.fields = new ArrayList<>(); + } } /** diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java index 11c3550f42..8a63e0c66c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; /** @@ -36,7 +37,11 @@ public class KvSinkInfo extends SinkInfo { @JsonProperty("charset") String charset, @JsonProperty("fields") List<FieldInfo> fields) { super(SourceInfo.KV, charset); - this.fields = fields; + if (fields != null) { + this.fields = fields; + } else { + this.fields = new ArrayList<>(); + } } /** diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java index 6d92d44920..e627367548 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; /** @@ -36,7 +37,11 @@ public class KvSourceInfo extends SourceInfo { @JsonProperty("charset") String charset, @JsonProperty("fields") List<FieldInfo> fields) { super(charset); - this.fields = fields; + if (fields != null) { + this.fields = fields; + } else { + this.fields = new ArrayList<>(); + } } /** diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index c979ef71c4..c3ee9270cc 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -172,10 +172,10 @@ public class TransformProcessor { String fieldName = entry.getKey(); try { Object fieldValue = entry.getValue().parse(sourceData, i); - sinkData.putField(fieldName, String.valueOf(fieldValue)); + sinkData.addField(fieldName, String.valueOf(fieldValue)); } catch (Throwable t) { LOG.error(t.getMessage(), t); - sinkData.putField(fieldName, ""); + sinkData.addField(fieldName, ""); } } sinkDatas.add(this.encoder.encode(sinkData)); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 7f73e11d06..14e7c20d4b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -144,6 +144,12 @@ public class OperatorTools { */ @SuppressWarnings("rawtypes") public static int compareValue(Comparable left, Comparable right) { + if (left == null) { + return right == null ? 0 : -1; + } + if (right == null) { + return 1; + } if (left instanceof String) { if (right instanceof String) { return ObjectUtils.compare(left, right); diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 7e46e28105..b76dde6f7f 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -43,165 +43,176 @@ import java.util.List; public class TestTransformProcessor { @Test - public void testCsv2Kv() { - try { - List<FieldInfo> fields = new ArrayList<>(); - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - fields.add(ftime); - FieldInfo extinfo = new FieldInfo(); - extinfo.setName("extinfo"); - fields.add(extinfo); - SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); - SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); - String transformSql = "select ftime,extinfo from source where extinfo='ok'"; - TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); - // case1 - TransformProcessor processor1 = new TransformProcessor(config); - List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); - Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); - // case2 - config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); - List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); - } catch (Exception e) { - e.printStackTrace(); - } + public void testCsv2Kv() throws Exception { + List<FieldInfo> fields = new ArrayList<>(); + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + fields.add(ftime); + FieldInfo extinfo = new FieldInfo(); + extinfo.setName("extinfo"); + fields.add(extinfo); + SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); + SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); + String transformSql = "select ftime,extinfo from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); + } + + @Test + public void testCsv2KvNoField() throws Exception { + SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null); + SinkInfo kvSink = new KvSinkInfo("UTF-8", null); + String transformSql = "select $1 ftime,$2 extinfo from source where $2='ok'"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select $1 ftime,$2 extinfo from source where $2!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); + } + + @Test + public void testKv2Csv() throws Exception { + List<FieldInfo> fields = new ArrayList<>(); + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + fields.add(ftime); + FieldInfo extinfo = new FieldInfo(); + extinfo.setName("extinfo"); + fields.add(extinfo); + SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select ftime,extinfo from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); } @Test - public void testKv2Csv() { - try { - List<FieldInfo> fields = new ArrayList<>(); - FieldInfo ftime = new FieldInfo(); - ftime.setName("ftime"); - fields.add(ftime); - FieldInfo extinfo = new FieldInfo(); - extinfo.setName("extinfo"); - fields.add(extinfo); - SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select ftime,extinfo from source where extinfo='ok'"; - TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); - // case1 - TransformProcessor processor1 = new TransformProcessor(config); - List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); - Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); - // case2 - config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); - TransformProcessor processor2 = new TransformProcessor(config); - List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); - } catch (Exception e) { - e.printStackTrace(); - } + public void testKv2CsvNoField() throws Exception { + SourceInfo kvSource = new KvSourceInfo("UTF-8", null); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null); + String transformSql = "select ftime,extinfo from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); } @Test - public void testJson2Csv() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; - TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - String srcString = "{\n" - + " \"sid\":\"value1\",\n" - + " \"packageID\":\"value2\",\n" - + " \"msgs\":[\n" - + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" - + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" - + " ]\n" - + "}"; - List<String> output = processor.transform(srcString, new HashMap<>()); - Assert.assertTrue(output.size() == 2); - Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); - Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testJson2Csv() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + String srcString = "{\n" + + " \"sid\":\"value1\",\n" + + " \"packageID\":\"value2\",\n" + + " \"msgs\":[\n" + + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" + + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" + + " ]\n" + + "}"; + List<String> output = processor.transform(srcString, new HashMap<>()); + Assert.assertTrue(output.size() == 2); + Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); + Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4"); } @Test - public void testJson2CsvForOne() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - SourceInfo jsonSource = new JsonSourceInfo("UTF-8", ""); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = - "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; - TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - String srcString = "{\n" - + " \"sid\":\"value1\",\n" - + " \"packageID\":\"value2\",\n" - + " \"msgs\":[\n" - + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" - + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" - + " ]\n" - + "}"; - List<String> output = processor.transform(srcString, new HashMap<>()); - Assert.assertTrue(output.size() == 1); - Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testJson2CsvForOne() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + SourceInfo jsonSource = new JsonSourceInfo("UTF-8", ""); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; + TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + String srcString = "{\n" + + " \"sid\":\"value1\",\n" + + " \"packageID\":\"value2\",\n" + + " \"msgs\":[\n" + + " {\"msg\":\"value4\",\"msgTime\":1713243918000},\n" + + " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n" + + " ]\n" + + "}"; + List<String> output = processor.transform(srcString, new HashMap<>()); + Assert.assertTrue(output.size() == 1); + Assert.assertEquals(output.get(0), "value1|value2|1713243918000|value4"); } @Test - public void testKvCsvByJsonConfig() { - try { - String configString1 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," - + "\"escapeChar\":\"\\\\\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"transformSql\":\"select ftime,extinfo from source where extinfo='ok'\"}"; - // case1 - TransformProcessor processor1 = new TransformProcessor(configString1); - List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output1.size() == 1); - Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); - // case2 - String configString2 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," - + "\"escapeChar\":\"\\\\\"," - + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," - + "\"transformSql\":\"select ftime,extinfo from source where extinfo!='ok'\"}"; - TransformProcessor processor2 = new TransformProcessor(configString2); - List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); - Assert.assertTrue(output2.size() == 0); - } catch (Exception e) { - e.printStackTrace(); - } + public void testKvCsvByJsonConfig() throws Exception { + String configString1 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," + + "\"escapeChar\":\"\\\\\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"transformSql\":\"select ftime,extinfo from source where extinfo='ok'\"}"; + // case1 + TransformProcessor processor1 = new TransformProcessor(configString1); + List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + String configString2 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," + + "\"escapeChar\":\"\\\\\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"transformSql\":\"select ftime,extinfo from source where extinfo!='ok'\"}"; + TransformProcessor processor2 = new TransformProcessor(configString2); + List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); } @Test - public void testPb2Csv() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 2); - Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4"); - Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testPb2Csv() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 2); + Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4"); + Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42"); } private List<FieldInfo> getTestFieldList() { @@ -250,90 +261,73 @@ public class TestTransformProcessor { } @Test - public void testPb2CsvForOne() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = - "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 1); - Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testPb2CsvForOne() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 1); + Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4"); } @Test - public void testPb2CsvForAdd() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select $root.sid," - + "($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2," - + "$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)" - + "*$root.packageID field3," - + "$root.msgs(0).msg field4 from source " - + "where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime" - + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 1); - Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testPb2CsvForAdd() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid," + + "($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2," + + "$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)" + + "*$root.packageID field3," + + "$root.msgs(0).msg field4 from source " + + "where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime" + + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 1); + Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4"); } @Test - public void testPb2CsvForConcat() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select $root.sid,$root.packageID,$child.msgTime," - + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 2); - Assert.assertEquals(output.get(0), "sid|1|1713243918000|sid11713243918000msgValue4"); - Assert.assertEquals(output.get(1), "sid|1|1713243918002|sid11713243918002msgValue42"); - } catch (Exception e) { - e.printStackTrace(); - } + public void testPb2CsvForConcat() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid,$root.packageID,$child.msgTime," + + "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg from source"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 2); + Assert.assertEquals(output.get(0), "sid|1|1713243918000|sid11713243918000msgValue4"); + Assert.assertEquals(output.get(1), "sid|1|1713243918002|sid11713243918002msgValue42"); } @Test - public void testPb2CsvForNow() { - try { - List<FieldInfo> fields = this.getTestFieldList(); - String transformBase64 = this.getPbTestDescription(); - SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); - SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); - String transformSql = "select now() from source"; - TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); - // case1 - TransformProcessor processor = new TransformProcessor(config); - byte[] srcBytes = this.getPbTestData(); - List<String> output = processor.transform(srcBytes, new HashMap<>()); - Assert.assertTrue(output.size() == 2); - } catch (Exception e) { - e.printStackTrace(); - } + public void testPb2CsvForNow() throws Exception { + List<FieldInfo> fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select now() from source"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List<String> output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 2); } }