This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ec1740e3ca [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV protocol by single SQL (#10097) ec1740e3ca is described below commit ec1740e3ca269956b60024e098cba7a81c603be2 Author: 卢春亮 <luchunli...@apache.org> AuthorDate: Mon Apr 29 18:21:01 2024 +0800 [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV protocol by single SQL (#10097) * [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV protocol by single SQL --- inlong-sdk/pom.xml | 1 + inlong-sdk/transform-sdk/pom.xml | 102 ++++ .../inlong/sdk/transform/decode/CsvSourceData.java | 60 +++ .../sdk/transform/decode/CsvSourceDecoder.java | 88 ++++ .../inlong/sdk/transform/decode/KvSourceData.java | 59 +++ .../sdk/transform/decode/KvSourceDecoder.java | 74 +++ .../inlong/sdk/transform/decode/KvUtils.java | 579 +++++++++++++++++++++ .../inlong/sdk/transform/decode/SourceData.java | 30 ++ .../inlong/sdk/transform/decode/SourceDecoder.java | 30 ++ .../inlong/sdk/transform/decode/SplitUtils.java | 164 ++++++ .../sdk/transform/encode/CsvSinkEncoder.java | 77 +++ .../sdk/transform/encode/DefaultSinkData.java | 60 +++ .../inlong/sdk/transform/encode/EscapeUtils.java | 83 +++ .../inlong/sdk/transform/encode/KvSinkEncoder.java | 64 +++ .../inlong/sdk/transform/encode/SinkData.java | 33 ++ .../inlong/sdk/transform/encode/SinkEncoder.java | 26 + .../inlong/sdk/transform/pojo/CsvSinkInfo.java | 99 ++++ .../inlong/sdk/transform/pojo/CsvSourceInfo.java | 99 ++++ .../inlong/sdk/transform/pojo/FieldInfo.java | 29 ++ .../inlong/sdk/transform/pojo/JsonSourceInfo.java | 56 ++ .../inlong/sdk/transform/pojo/KvSinkInfo.java | 58 +++ .../inlong/sdk/transform/pojo/KvSourceInfo.java | 59 +++ .../inlong/sdk/transform/pojo/PbSourceInfo.java | 78 +++ .../inlong/sdk/transform/pojo/ProtocolType.java | 61 +++ .../apache/inlong/sdk/transform/pojo/SinkInfo.java | 87 ++++ .../inlong/sdk/transform/pojo/SourceInfo.java | 69 +++ .../inlong/sdk/transform/pojo/TransformConfig.java | 96 ++++ .../sdk/transform/process/TransformProcessor.java | 160 ++++++ .../transform/process/operator/AndOperator.java | 49 ++ .../process/operator/EqualsToOperator.java | 51 ++ .../process/operator/ExpressionOperator.java | 28 + .../operator/GreaterThanEqualsOperator.java | 53 ++ .../process/operator/GreaterThanOperator.java | 53 ++ .../process/operator/MinorThanEqualsOperator.java | 53 ++ .../process/operator/MinorThanOperator.java | 53 ++ .../process/operator/NotEqualsToOperator.java | 51 ++ .../transform/process/operator/NotOperator.java | 47 ++ .../transform/process/operator/OperatorTools.java | 81 +++ .../sdk/transform/process/operator/OrOperator.java | 49 ++ .../process/operator/ParenthesisOperator.java | 47 ++ .../sdk/transform/process/parser/ColumnParser.java | 47 ++ .../sdk/transform/process/parser/LongParser.java | 46 ++ .../sdk/transform/process/parser/StringParser.java | 46 ++ .../sdk/transform/process/parser/ValueParser.java | 29 ++ .../sdk/transform/pojo/TestTransformConfig.java | 113 ++++ .../transform/process/TestTransformProcessor.java | 128 +++++ 46 files changed, 3505 insertions(+) diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml index f9dd979680..11c51d1aec 100644 --- a/inlong-sdk/pom.xml +++ b/inlong-sdk/pom.xml @@ -33,6 +33,7 @@ <module>sdk-common</module> <module>sort-sdk</module> <module>dataproxy-sdk</module> + <module>transform-sdk</module> </modules> <properties> diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml new file mode 100644 index 0000000000..aeb3e100b0 --- /dev/null +++ b/inlong-sdk/transform-sdk/pom.xml @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>inlong-sdk</artifactId> + <version>1.13.0-SNAPSHOT</version> + </parent> + <artifactId>transform-sdk</artifactId> + <version>1.13.0-SNAPSHOT</version> + + <name>Apache InLong - Transform SDK</name> + + <properties> + <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>inlong-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sdk-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>3.1.0</version> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.2.0</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>${plugin.compile.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${plugin.surefire.version}</version> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>3.0.2</version> + </plugin> + <plugin> + <artifactId>maven-install-plugin</artifactId> + <version>2.5.2</version> + </plugin> + <plugin> + <artifactId>maven-deploy-plugin</artifactId> + <version>${plugin.deploy.version}</version> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.7.1</version> + </plugin> + <plugin> + <artifactId>maven-project-info-reports-plugin</artifactId> + <version>3.0.0</version> + </plugin> + </plugins> + </pluginManagement> + </build> + +</project> diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java new file mode 100644 index 0000000000..77f173f1a6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * CsvSourceData + * + */ +public class CsvSourceData implements SourceData { + + private List<Map<String, String>> rows = new ArrayList<>(); + + private Map<String, String> currentRow; + + public CsvSourceData() { + } + + public void putField(String fieldName, String fieldValue) { + this.currentRow.put(fieldName, fieldValue); + } + + public void addRow() { + this.currentRow = new HashMap<>(); + rows.add(currentRow); + } + + @Override + public int getRowCount() { + return this.rows.size(); + } + + @Override + public String getField(int rowNum, String fieldName) { + if (rowNum > this.rows.size()) { + return null; + } + Map<String, String> targetRow = this.rows.get(rowNum - 1); + return targetRow.get(fieldName); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java new file mode 100644 index 0000000000..22e3b9ba13 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * CsvSourceDecoder + * + */ +public class CsvSourceDecoder implements SourceDecoder { + + protected CsvSourceInfo sourceInfo; + private Charset srcCharset = Charset.defaultCharset(); + private Character delimiter = '|'; + private Character escapeChar = null; + private List<FieldInfo> fields; + + public CsvSourceDecoder(CsvSourceInfo sourceInfo) { + this.sourceInfo = sourceInfo; + if (!StringUtils.isBlank(sourceInfo.getDelimiter())) { + this.delimiter = sourceInfo.getDelimiter().charAt(0); + } + if (!StringUtils.isBlank(sourceInfo.getEscapeChar())) { + this.escapeChar = sourceInfo.getEscapeChar().charAt(0); + } + if (!StringUtils.isBlank(sourceInfo.getCharset())) { + this.srcCharset = Charset.forName(sourceInfo.getCharset()); + } + this.fields = sourceInfo.getFields(); + } + + @Override + public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + String srcString = new String(srcBytes, srcCharset); + return this.decode(srcString, extParams); + } + + @Override + public SourceData decode(String srcString, Map<String, Object> extParams) { + String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, escapeChar, '\"', '\n', true); + CsvSourceData sourceData = new CsvSourceData(); + for (int i = 0; i < rowValues.length; i++) { + String[] fieldValues = rowValues[i]; + sourceData.addRow(); + if (fields == null || fields.size() == 0) { + for (int j = 0; j < fieldValues.length; j++) { + String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j + 1); + sourceData.putField(fieldName, fieldValues[i]); + } + continue; + } + int fieldIndex = 0; + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = null; + if (fieldIndex < fieldValues.length) { + fieldValue = fieldValues[fieldIndex]; + } + sourceData.putField(fieldName, fieldValue); + fieldIndex++; + } + } + return sourceData; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java new file mode 100644 index 0000000000..3e3f600197 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * KvSourceData + */ +public class KvSourceData implements SourceData { + + private List<Map<String, String>> rows = new ArrayList<>(); + + private Map<String, String> currentRow; + + public KvSourceData() { + } + + public void putField(String fieldName, String fieldValue) { + this.currentRow.put(fieldName, fieldValue); + } + + public void addRow() { + this.currentRow = new HashMap<>(); + rows.add(currentRow); + } + + @Override + public int getRowCount() { + return this.rows.size(); + } + + @Override + public String getField(int rowNum, String fieldName) { + if (rowNum > this.rows.size()) { + return null; + } + Map<String, String> targetRow = this.rows.get(rowNum - 1); + return targetRow.get(fieldName); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java new file mode 100644 index 0000000000..03b40c9f1c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * KvSourceDecoder + * + */ +public class KvSourceDecoder implements SourceDecoder { + + protected KvSourceInfo sourceInfo; + private Charset srcCharset = Charset.defaultCharset(); + private List<FieldInfo> fields; + + public KvSourceDecoder(KvSourceInfo sourceInfo) { + this.sourceInfo = sourceInfo; + if (!StringUtils.isBlank(sourceInfo.getCharset())) { + this.srcCharset = Charset.forName(sourceInfo.getCharset()); + } + this.fields = sourceInfo.getFields(); + } + + @Override + public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) { + String srcString = new String(srcBytes, srcCharset); + return this.decode(srcString, extParams); + } + + @Override + public SourceData decode(String srcString, Map<String, Object> extParams) { + List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&', '=', '\\', '\"', '\n'); + KvSourceData sourceData = new KvSourceData(); + if (fields == null || fields.size() == 0) { + for (Map<String, String> row : rowValues) { + sourceData.addRow(); + row.forEach((k, v) -> sourceData.putField(k, v)); + } + return sourceData; + } + for (Map<String, String> row : rowValues) { + sourceData.addRow(); + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = row.get(fieldName); + sourceData.putField(fieldName, fieldValue); + } + } + return sourceData; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java new file mode 100644 index 0000000000..60c804c48f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The utilities for strings. + */ +public class KvUtils { + + private static final int STATE_NORMAL = 0; + private static final int STATE_KEY = 2; + private static final int STATE_VALUE = 4; + private static final int STATE_ESCAPING = 8; + private static final int STATE_QUOTING = 16; + + /** + * Splits the kv text. + * + * <p>Both escaping and quoting is supported. When the escape character is + * not '\0', then the next character to the escape character will be + * escaped. When the quote character is not '\0', then all characters + * between consecutive quote characters will be escaped.</p> + * + * @param text The text to be split. + * @param entryDelimiter The delimiter of entries. + * @param kvDelimiter The delimiter between key and value. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @return The fields split from the text. + */ + public static Map<String, String> splitKv( + @Nonnull String text, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar) { + List<Map<String, String>> lines = splitKv(text, entryDelimiter, kvDelimiter, escapeChar, quoteChar, null); + if (lines.size() == 0) { + return new HashMap<>(); + } + return lines.get(0); + } + + /** + * Splits the kv text. + * + * <p>Both escaping and quoting is supported. When the escape character is + * not '\0', then the next character to the escape character will be + * escaped. When the quote character is not '\0', then all characters + * between consecutive quote characters will be escaped.</p> + * + * @param text The text to be split. + * @param entryDelimiter The delimiter of entries. + * @param kvDelimiter The delimiter between key and value. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @param lineDelimiter The line delimiter character. + * @return The fields split from the text. + */ + public static List<Map<String, String>> splitKv( + @Nonnull String text, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable Character lineDelimiter) { + Map<String, String> fields = new HashMap<>(); + List<Map<String, String>> lines = new ArrayList<>(); + + StringBuilder stringBuilder = new StringBuilder(); + + String key = ""; + String value; + + int state = STATE_KEY; + + /* + * The state when entering escaping and quoting. When we exit escaping or quoting, we should restore this state. + */ + int kvState = STATE_KEY; + + for (int i = 0; i < text.length(); ++i) { + char ch = text.charAt(i); + + if (ch == kvDelimiter) { + switch (state) { + case STATE_KEY: + key = stringBuilder.toString(); + stringBuilder.setLength(0); + state = STATE_VALUE; + break; + case STATE_VALUE: + // throw new IllegalArgumentException("Unexpected token " + + // ch + " at position " + i + "."); + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else if (ch == entryDelimiter) { + switch (state) { + case STATE_KEY: + // throw new IllegalArgumentException("Unexpected token " + + // ch + " at position " + i + "."); + key = stringBuilder.toString(); + stringBuilder.setLength(0); + fields.put(key, ""); + state = STATE_KEY; + break; + case STATE_VALUE: + value = stringBuilder.toString(); + fields.put(key, value); + + stringBuilder.setLength(0); + state = STATE_KEY; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else if (escapeChar != null && ch == escapeChar) { + switch (state) { + case STATE_KEY: + case STATE_VALUE: + kvState = state; + state = STATE_ESCAPING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else if (quoteChar != null && ch == quoteChar) { + switch (state) { + case STATE_KEY: + case STATE_VALUE: + kvState = state; + state = STATE_QUOTING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + state = kvState; + break; + default: + break; + } + } else if (lineDelimiter != null && ch == lineDelimiter) { + switch (state) { + case STATE_VALUE: + value = stringBuilder.toString(); + fields.put(key, value); + Map<String, String> copyFields = new HashMap<>(); + copyFields.putAll(fields); + lines.add(copyFields); + stringBuilder.setLength(0); + fields.clear(); + state = STATE_KEY; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = STATE_NORMAL; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else { + stringBuilder.append(ch); + } + } + + switch (state) { + case STATE_KEY: + // throw new IllegalArgumentException("Dangling key."); + key = stringBuilder.toString(); + stringBuilder.setLength(0); + fields.put(key, ""); + lines.add(fields); + return lines; + case STATE_VALUE: + value = stringBuilder.toString(); + fields.put(key, value); + lines.add(fields); + return lines; + case STATE_ESCAPING: + // throw new IllegalArgumentException("Not closed escaping."); + case STATE_QUOTING: + // throw new IllegalArgumentException("Not closed quoting."); + default: + // throw new IllegalStateException(); + if (kvState == STATE_VALUE) { + key = stringBuilder.toString(); + stringBuilder.setLength(0); + fields.put(key, ""); + lines.add(fields); + return lines; + // } else if (kvState == STATE_KEY) { + } else { + value = stringBuilder.toString(); + fields.put(key, value); + lines.add(fields); + return lines; + } + } + } + + /** + * Concat the given fields' keys and values. + * + * <p>Special characters in the text will be escaped or quoted if + * corresponding character is given. Otherwise, an exception will be + * thrown.</p> + * + * @param fieldKeys The keys to be concat. + * @param fieldValues The values to be concat. + * @param entryDelimiter The delimiter of entries. + * @param kvDelimiter The delimiter between key and value. + * @param escapeChar The escape character. + * @param quoteChar The quote character. + * @return The concated text of given fields. + */ + public static String concatKv( + @Nonnull String[] fieldKeys, + @Nonnull String[] fieldValues, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar) { + if (fieldKeys.length != fieldValues.length) { + throw new IllegalArgumentException("The keys' number " + fieldKeys.length + + " doesn't match values' number " + fieldValues.length); + } + + Collection<Character> delimiters = Arrays.asList(entryDelimiter, kvDelimiter); + + StringBuilder stringBuilder = new StringBuilder(); + + for (int index = 0; index < fieldKeys.length; ++index) { + + encodeText( + stringBuilder, + fieldKeys[index], + delimiters, + escapeChar, + quoteChar); + + stringBuilder.append(kvDelimiter); + + encodeText( + stringBuilder, + fieldValues[index], + delimiters, + escapeChar, + quoteChar); + + if (index < fieldKeys.length - 1) { + stringBuilder.append(entryDelimiter); + } + } + + return stringBuilder.toString(); + } + + private static void encodeText( + StringBuilder stringBuilder, + String text, + Collection<Character> delimiters, + Character escapeChar, + Character quoteChar) { + for (int i = 0; i < text.length(); ++i) { + char ch = text.charAt(i); + + if (delimiters.contains(ch)) { + if (escapeChar != null) { + stringBuilder.append(escapeChar); + stringBuilder.append(ch); + } else if (quoteChar != null) { + stringBuilder.append(quoteChar); + stringBuilder.append(ch); + stringBuilder.append(quoteChar); + } else { + throw new IllegalArgumentException("There is a delimiter in the text, " + + "but neither escape nor quote character is specified."); + } + } else if (escapeChar != null && ch == escapeChar) { + stringBuilder.append(escapeChar); + stringBuilder.append(ch); + } else if (quoteChar != null && ch == quoteChar) { + if (escapeChar != null) { + stringBuilder.append(escapeChar); + stringBuilder.append(ch); + } else { + throw new IllegalArgumentException("There is a quote character in the text, " + + "but escape character is not specified."); + } + } else { + stringBuilder.append(ch); + } + } + } + + /** + * Splits a single line of csv text. + * + * @see KvUtils#splitCsv(String, Character, Character, Character, Character, boolean) + * + * @param text The text to be split. + * @param delimiter The delimiter of fields. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @return The split array content. + */ + public static String[] splitCsv( + @Nonnull String text, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar) { + String[][] splitResult = splitCsv(text, delimiter, escapeChar, quoteChar, null); + if (splitResult.length == 0) { + return new String[0]; + } + return splitResult[0]; + } + + /** + * @see KvUtils#splitCsv(String, Character, Character, Character, Character, boolean) + * + * @param text The text to be split. + * @param delimiter The delimiter of fields. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @param lineDelimiter The delimiter between lines, e.g. '\n'. + * @return The split value. + */ + public static String[][] splitCsv( + @Nonnull String text, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable Character lineDelimiter) { + return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, false); + } + + /** + * Splits the csv text, which may contains multiple lines of data. + * + * <p>Both escaping and quoting is supported. When the escape character is + * not '\0', then the next character to the escape character will be + * escaped. When the quote character is not '\0', then all characters + * between consecutive quote characters will be escaped.</p> + * + * @param text The text to be split. + * @param delimiter The delimiter of fields. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @param lineDelimiter The delimiter between lines, e.g. '\n'. + * @param deleteHeadDelimiter If true and the leading character of a line + * is a delimiter, it will be ignored. + * @return A 2-D String array representing the parsed data, where the 1st + * dimension is row and the 2nd dimension is column. + */ + public static String[][] splitCsv( + @Nonnull String text, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable Character lineDelimiter, + boolean deleteHeadDelimiter) { + List<String[]> lines = new ArrayList<>(); + List<String> fields = new ArrayList<>(); + + StringBuilder stringBuilder = new StringBuilder(); + int state = STATE_NORMAL; + + for (int i = 0; i < text.length(); ++i) { + char ch = text.charAt(i); + + if (ch == delimiter) { + switch (state) { + case STATE_NORMAL: + if (deleteHeadDelimiter && fields.isEmpty() + && stringBuilder.length() == 0) { + break; + } + String field = stringBuilder.toString(); + fields.add(field); + stringBuilder.setLength(0); + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = STATE_NORMAL; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else if (escapeChar != null && ch == escapeChar) { + switch (state) { + case STATE_NORMAL: + state = STATE_ESCAPING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = STATE_NORMAL; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else if (quoteChar != null && ch == quoteChar) { + switch (state) { + case STATE_NORMAL: + state = STATE_QUOTING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = STATE_NORMAL; + break; + case STATE_QUOTING: + state = STATE_NORMAL; + break; + default: + break; + } + } else if (lineDelimiter != null && ch == lineDelimiter) { + switch (state) { + case STATE_NORMAL: + String field = stringBuilder.toString(); + fields.add(field); + lines.add(fields.toArray(new String[0])); + + stringBuilder.setLength(0); + fields.clear(); + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = STATE_NORMAL; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + default: + break; + } + } else { + if (state == STATE_ESCAPING) { + state = STATE_NORMAL; + } + stringBuilder.append(ch); + } + } + + switch (state) { + case STATE_NORMAL: + String field = stringBuilder.toString(); + fields.add(field); + lines.add(fields.toArray(new String[0])); + + String[][] result = new String[lines.size()][]; + for (int i = 0; i < lines.size(); ++i) { + result[i] = lines.get(i); + } + return result; + + case STATE_ESCAPING: + throw new IllegalArgumentException(String.format("Not closed escaping. Text=[%s].", text)); + case STATE_QUOTING: + throw new IllegalArgumentException(String.format("Not closed quoting. Text=[%s].", text)); + default: + throw new IllegalStateException(String.format("Text=[%s].", text)); + } + } + + /** + * Concat the given fields. + * + * <p>Special characters in the text will be escaped or quoted if + * corresponding character is given. Otherwise, an exception will be + * thrown.</p> + * + * @param fields The fields to be concat. + * @param delimiter The delimiter of fields. + * @param escapeChar The escape character. + * @param quoteChar The quote character. + * @return The concated text of given fields. + */ + public static String concatCsv( + @Nonnull String[] fields, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar) { + StringBuilder stringBuilder = new StringBuilder(); + + for (int index = 0; index < fields.length; ++index) { + + String field = fields[index]; + + for (int i = 0; i < field.length(); ++i) { + char ch = field.charAt(i); + + if (ch == delimiter + || (escapeChar != null && ch == escapeChar) + || (quoteChar != null && ch == quoteChar)) { + + if (escapeChar != null) { + stringBuilder.append(escapeChar); + stringBuilder.append(ch); + } else if (quoteChar != null && ch != quoteChar) { + stringBuilder.append(quoteChar); + stringBuilder.append(ch); + stringBuilder.append(quoteChar); + } else { + throw new IllegalArgumentException("There exist special characters in the text, " + + "but neither escape character nor quote character is configured."); + } + } else { + stringBuilder.append(ch); + } + } + + if (index < fields.length - 1) { + stringBuilder.append(delimiter); + } + } + + return stringBuilder.toString(); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java new file mode 100644 index 0000000000..2c39948f2d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +/** + * SourceData + */ +public interface SourceData { + + String FIELD_DEFAULT_PREFIX = "$"; + + int getRowCount(); + + String getField(int rowNum, String fieldName); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java new file mode 100644 index 0000000000..a11cd89351 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import java.util.Map; + +/** + * SourceDecoder + */ +public interface SourceDecoder { + + SourceData decode(byte[] srcBytes, Map<String, Object> extParams); + + SourceData decode(String srcString, Map<String, Object> extParams); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java new file mode 100644 index 0000000000..f29af8f93a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.decode; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * SplitUtils + * + */ +public class SplitUtils { + + public static final int STATE_NORMAL = 0; + public static final int STATE_KEY = 2; + public static final int STATE_VALUE = 4; + public static final int STATE_ESCAPING = 8; + public static final int STATE_QUOTING = 16; + + public static String[] splitCsv(@Nonnull String text, @Nonnull Character delimiter, @Nullable Character escapeChar, + @Nullable Character quoteChar, boolean hasEscapeProcess) { + String[][] splitResult = splitCsv(text, delimiter, escapeChar, quoteChar, null, hasEscapeProcess); + if (splitResult.length == 0) { + return new String[0]; + } + return splitResult[0]; + } + + public static String[][] splitCsv(@Nonnull String text, @Nonnull Character delimiter, + @Nullable Character escapeChar, @Nullable Character quoteChar, @Nullable Character lineDelimiter, + boolean hasEscapeProcess) { + return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, false, hasEscapeProcess); + } + + public static String[][] splitCsv(@Nonnull String text, @Nonnull Character delimiter, + @Nullable Character escapeChar, @Nullable Character quoteChar, @Nullable Character lineDelimiter, + boolean deleteHeadDelimiter, boolean hasEscapeProcess) { + char deli = delimiter.charValue(); + char escape = (escapeChar == null) ? '\\' : escapeChar.charValue(); + char quote = (quoteChar == null) ? '\"' : quoteChar.charValue(); + char line = (lineDelimiter == null) ? '\n' : lineDelimiter.charValue(); + List<String[]> lines = new ArrayList<>(); + List<String> fields = new ArrayList<>(); + + int state = STATE_NORMAL; + + char[] srcValue = text.toCharArray(); + char[] fieldValue = new char[srcValue.length]; + int fieldIndex = 0; + for (int i = 0; i < text.length(); i++) { + char ch = srcValue[i]; + + if (ch == deli) { + switch (state) { + case STATE_NORMAL: + if (fieldIndex == 0 && deleteHeadDelimiter && fields.isEmpty()) { + break; + } + fields.add(new String(fieldValue, 0, fieldIndex)); + fieldIndex = 0; + break; + case STATE_ESCAPING: + fieldValue[fieldIndex++] = ch; + state = STATE_NORMAL; + break; + case STATE_QUOTING: + fieldValue[fieldIndex++] = ch; + break; + default: + break; + } + } else if (escapeChar != null && ch == escape) { + switch (state) { + case STATE_NORMAL: + state = STATE_ESCAPING; + break; + case STATE_ESCAPING: + if (!hasEscapeProcess) { + fieldValue[fieldIndex++] = escapeChar; + } + fieldValue[fieldIndex++] = ch; + state = STATE_NORMAL; + break; + case STATE_QUOTING: + fieldValue[fieldIndex++] = ch; + break; + default: + break; + } + } else if (quoteChar != null && ch == quote) { + switch (state) { + case STATE_NORMAL: + state = STATE_QUOTING; + break; + case STATE_ESCAPING: + if (!hasEscapeProcess) { + fieldValue[fieldIndex++] = escapeChar; + } + fieldValue[fieldIndex++] = ch; + state = STATE_NORMAL; + break; + case STATE_QUOTING: + state = STATE_NORMAL; + break; + default: + break; + } + } else if (lineDelimiter != null && ch == line) { + switch (state) { + case STATE_NORMAL: + fields.add(new String(fieldValue, 0, fieldIndex)); + fieldIndex = 0; + lines.add(fields.toArray(new String[0])); + fields.clear(); + break; + case STATE_ESCAPING: + fieldValue[fieldIndex++] = ch; + state = STATE_NORMAL; + break; + case STATE_QUOTING: + fieldValue[fieldIndex++] = ch; + break; + default: + break; + } + } else { + if (state == STATE_ESCAPING) { + if (!hasEscapeProcess) { + fieldValue[fieldIndex++] = escapeChar; + } + state = STATE_NORMAL; + } + fieldValue[fieldIndex++] = ch; + } + } + fields.add(new String(fieldValue, 0, fieldIndex)); + fieldIndex = 0; + lines.add(fields.toArray(new String[0])); + + String[][] result = new String[lines.size()][]; + for (int i = 0; i < lines.size(); i++) { + result[i] = lines.get(i); + } + return result; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java new file mode 100644 index 0000000000..cb3c9405a0 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * CsvSinkEncoder + */ +public class CsvSinkEncoder implements SinkEncoder { + + protected CsvSinkInfo sinkInfo; + protected Charset sinkCharset = Charset.defaultCharset(); + private Character delimiter = '|'; + private Character escapeChar = null; + private List<FieldInfo> fields; + private StringBuilder builder = new StringBuilder(); + + public CsvSinkEncoder(CsvSinkInfo sinkInfo) { + this.sinkInfo = sinkInfo; + if (!StringUtils.isBlank(sinkInfo.getDelimiter())) { + this.delimiter = sinkInfo.getDelimiter().charAt(0); + } + if (!StringUtils.isBlank(sinkInfo.getEscapeChar())) { + this.escapeChar = sinkInfo.getEscapeChar().charAt(0); + } + if (!StringUtils.isBlank(sinkInfo.getCharset())) { + this.sinkCharset = Charset.forName(sinkInfo.getCharset()); + } + this.fields = sinkInfo.getFields(); + } + + /** + * encode + * @param sinkData + * @return + */ + @Override + public String encode(SinkData sinkData) { + if (fields == null || fields.size() == 0) { + return ""; + } + builder.delete(0, builder.length()); + if (escapeChar == null) { + fields.forEach(v -> builder.append(sinkData.getField(v.getName())).append(delimiter)); + } else { + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = sinkData.getField(fieldName); + EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); + builder.append(delimiter); + } + } + return builder.substring(0, builder.length() - 1); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java new file mode 100644 index 0000000000..2e1c3bea4b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * DefaultSinkData + * + */ +public class DefaultSinkData implements SinkData { + + private Map<String, String> currentRow = new HashMap<>(); + + /** + * putField + * @param fieldName + * @param fieldValue + */ + @Override + public void putField(String fieldName, String fieldValue) { + this.currentRow.put(fieldName, fieldValue); + } + + /** + * getField + * @param fieldName + * @return + */ + @Override + public String getField(String fieldName) { + return this.currentRow.getOrDefault(fieldName, ""); + } + + /** + * keySet + * @return + */ + @Override + public Set<String> keySet() { + return this.currentRow.keySet(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java new file mode 100644 index 0000000000..2ba3c429d1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import java.util.List; + +/** + * EscapeUtils + * + */ +public class EscapeUtils { + + public static String escapeStringSeparator(String fieldValue, char separator) { + StringBuilder builder = new StringBuilder(); + escapeContent(builder, separator, '\\', fieldValue); + String formatField = builder.toString(); + return formatField; + } + + public static String escapeFields(List<String> fields, char separator) { + if (fields.size() <= 0) { + return ""; + } + StringBuilder ss = new StringBuilder(); + for (String field : fields) { + String fmtField = escapeStringSeparator(field, separator); + ss.append(fmtField).append(separator); + } + String result = ss.substring(0, ss.length() - 1); + return result; + } + + public static void escapeContent(StringBuilder builder, char separator, char escapeChar, Object field) { + String strField = ""; + if (field != null) { + strField = String.valueOf(field); + } + int length = strField.length(); + + for (int i = 0; i < length; i++) { + putValueIntoStringBuilder(builder, separator, escapeChar, strField.charAt(i)); + } + } + + public static void putValueIntoStringBuilder(StringBuilder builder, char separator, final char escapeChar, + char value) { + switch (value) { + case 0: + builder.append(escapeChar).append('0'); + break; + case '\n': + builder.append(escapeChar).append('n'); + break; + case '\r': + builder.append(escapeChar).append('r'); + break; + default: + if (value == separator) { + builder.append(escapeChar).append(separator); + } else if (value == escapeChar) { + builder.append(escapeChar).append(escapeChar); + } else { + builder.append(value); + } + break; + } + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java new file mode 100644 index 0000000000..3fcc31107e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * KvSinkEncoder + */ +public class KvSinkEncoder implements SinkEncoder { + + protected KvSinkInfo sinkInfo; + protected Charset sinkCharset = Charset.defaultCharset(); + private List<FieldInfo> fields; + private StringBuilder builder = new StringBuilder(); + + public KvSinkEncoder(KvSinkInfo sinkInfo) { + this.sinkInfo = sinkInfo; + if (!StringUtils.isBlank(sinkInfo.getCharset())) { + this.sinkCharset = Charset.forName(sinkInfo.getCharset()); + } + this.fields = sinkInfo.getFields(); + } + + /** + * encode + * @param sinkData + * @return + */ + @Override + public String encode(SinkData sinkData) { + if (fields == null || fields.size() == 0) { + return ""; + } + builder.delete(0, builder.length()); + for (FieldInfo field : fields) { + String fieldName = field.getName(); + String fieldValue = sinkData.getField(fieldName); + builder.append(fieldName).append('=').append(fieldValue).append('&'); + } + return builder.substring(0, builder.length() - 1); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java new file mode 100644 index 0000000000..037df2dfcf --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +import java.util.Set; + +/** + * SinkData + * + */ +public interface SinkData { + + void putField(String fieldName, String fieldValue); + + String getField(String fieldName); + + Set<String> keySet(); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java new file mode 100644 index 0000000000..ab83d21a0b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.encode; + +/** + * SinkEncoder + */ +public interface SinkEncoder { + + String encode(SinkData sinkData); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java new file mode 100644 index 0000000000..88dd5bf36c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * CsvSinkInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CsvSinkInfo extends SinkInfo { + + private String delimiter; + private String escapeChar; + private List<FieldInfo> fields; + + @JsonCreator + public CsvSinkInfo( + @JsonProperty("charset") String charset, + @JsonProperty("delimiter") String delimiter, + @JsonProperty("escapeChar") String escapeChar, + @JsonProperty("fields") List<FieldInfo> fields) { + super(SourceInfo.CSV, charset); + this.delimiter = delimiter; + this.escapeChar = escapeChar; + this.fields = fields; + } + + /** + * get delimiter + * @return the delimiter + */ + @JsonProperty("delimiter") + public String getDelimiter() { + return delimiter; + } + + /** + * set delimiter + * @param delimiter the delimiter to set + */ + public void setDelimiter(String delimiter) { + this.delimiter = delimiter; + } + + /** + * get escapeChar + * @return the escapeChar + */ + @JsonProperty("escapeChar") + public String getEscapeChar() { + return escapeChar; + } + + /** + * set escapeChar + * @param escapeChar the escapeChar to set + */ + public void setEscapeChar(String escapeChar) { + this.escapeChar = escapeChar; + } + + /** + * get fields + * @return the fields + */ + @JsonProperty("fields") + public List<FieldInfo> getFields() { + return fields; + } + + /** + * set fields + * @param fields the fields to set + */ + public void setFields(List<FieldInfo> fields) { + this.fields = fields; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java new file mode 100644 index 0000000000..27a46bcb19 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * CsvSourceInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CsvSourceInfo extends SourceInfo { + + private String delimiter; + private String escapeChar; + private List<FieldInfo> fields; + + @JsonCreator + public CsvSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("delimiter") String delimiter, + @JsonProperty("escapeChar") String escapeChar, + @JsonProperty("fields") List<FieldInfo> fields) { + super(charset); + this.delimiter = delimiter; + this.escapeChar = escapeChar; + this.fields = fields; + } + + /** + * get delimiter + * @return the delimiter + */ + @JsonProperty("delimiter") + public String getDelimiter() { + return delimiter; + } + + /** + * set delimiter + * @param delimiter the delimiter to set + */ + public void setDelimiter(String delimiter) { + this.delimiter = delimiter; + } + + /** + * get escapeChar + * @return the escapeChar + */ + @JsonProperty("escapeChar") + public String getEscapeChar() { + return escapeChar; + } + + /** + * set escapeChar + * @param escapeChar the escapeChar to set + */ + public void setEscapeChar(String escapeChar) { + this.escapeChar = escapeChar; + } + + /** + * get fields + * @return the fields + */ + @JsonProperty("fields") + public List<FieldInfo> getFields() { + return fields; + } + + /** + * set fields + * @param fields the fields to set + */ + public void setFields(List<FieldInfo> fields) { + this.fields = fields; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java new file mode 100644 index 0000000000..46106e534f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import lombok.Data; + +/** + * FieldInfo + */ +@Data +public class FieldInfo { + + private String name; +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java new file mode 100644 index 0000000000..9935623d06 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * JsonSourceInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class JsonSourceInfo extends SourceInfo { + + private String rowsNodePath; + + @JsonCreator + public JsonSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset); + this.rowsNodePath = rowsNodePath; + } + + /** + * get rowsNodePath + * @return the rowsNodePath + */ + @JsonProperty("rowsNodePath") + public String getRowsNodePath() { + return rowsNodePath; + } + + /** + * set rowsNodePath + * @param rowsNodePath the rowsNodePath to set + */ + public void setRowsNodePath(String rowsNodePath) { + this.rowsNodePath = rowsNodePath; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java new file mode 100644 index 0000000000..11c3550f42 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * KvSinkInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class KvSinkInfo extends SinkInfo { + + private List<FieldInfo> fields; + + @JsonCreator + public KvSinkInfo( + @JsonProperty("charset") String charset, + @JsonProperty("fields") List<FieldInfo> fields) { + super(SourceInfo.KV, charset); + this.fields = fields; + } + + /** + * get fields + * @return the fields + */ + @JsonProperty("fields") + public List<FieldInfo> getFields() { + return fields; + } + + /** + * set fields + * @param fields the fields to set + */ + public void setFields(List<FieldInfo> fields) { + this.fields = fields; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java new file mode 100644 index 0000000000..6d92d44920 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * KvSourceInfo + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class KvSourceInfo extends SourceInfo { + + private List<FieldInfo> fields; + + @JsonCreator + public KvSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("fields") List<FieldInfo> fields) { + super(charset); + this.fields = fields; + } + + /** + * get fields + * @return the fields + */ + @JsonProperty("fields") + public List<FieldInfo> getFields() { + return fields; + } + + /** + * set fields + * @param fields the fields to set + */ + public void setFields(List<FieldInfo> fields) { + this.fields = fields; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java new file mode 100644 index 0000000000..ba226bd807 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * PbSourceInfo + * + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class PbSourceInfo extends SourceInfo { + + private String protoDefine; + private String rowsNodePath; + + @JsonCreator + public PbSourceInfo( + @JsonProperty("charset") String charset, + @JsonProperty("protoDefine") String protoDefine, + @JsonProperty("rowsNodePath") String rowsNodePath) { + super(charset); + this.protoDefine = protoDefine; + this.rowsNodePath = rowsNodePath; + } + + /** + * get protoDefine + * @return the protoDefine + */ + @JsonProperty("protoDefine") + public String getProtoDefine() { + return protoDefine; + } + + /** + * set protoDefine + * @param protoDefine the protoDefine to set + */ + public void setProtoDefine(String protoDefine) { + this.protoDefine = protoDefine; + } + + /** + * get rowsNodePath + * @return the rowsNodePath + */ + @JsonProperty("rowsNodePath") + public String getRowsNodePath() { + return rowsNodePath; + } + + /** + * set rowsNodePath + * @param rowsNodePath the rowsNodePath to set + */ + public void setRowsNodePath(String rowsNodePath) { + this.rowsNodePath = rowsNodePath; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java new file mode 100644 index 0000000000..13e9e2a179 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import java.util.Locale; + +/** + * ProtocolType + */ +public enum ProtocolType { + + CSV("csv"), KV("kv"), PB("pb"), JSON("json"), UNKNOWN("n"); + + private final String type; + + ProtocolType(String type) { + this.type = type; + } + + public static ProtocolType forType(String type) { + for (ProtocolType dataType : values()) { + if (dataType.getType().equals(type.toLowerCase(Locale.ROOT))) { + return dataType; + } + } + throw new IllegalArgumentException("Unsupported protocol type for " + type); + } + + public static ProtocolType convert(String value) { + for (ProtocolType v : values()) { + if (v.getType().equals(value.toLowerCase(Locale.ROOT))) { + return v; + } + } + return UNKNOWN; + } + + @Override + public String toString() { + return this.name() + ":" + this.type; + } + + public String getType() { + return type; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java new file mode 100644 index 0000000000..9bd4647479 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * SinkInfo + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({ + @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV), + @Type(value = KvSinkInfo.class, name = SourceInfo.KV), +}) +public abstract class SinkInfo { + + @JsonIgnore + private String type; + + @JsonProperty("charset") + private String charset; + + public SinkInfo( + String type, + @JsonProperty("charset") String charset) { + this.type = checkNotNull(type); + this.charset = Optional.ofNullable(charset).orElse("UTF-8"); + } + + /** + * get type + * @return the type + */ + @JsonIgnore + public String getType() { + return type; + } + + /** + * set type + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get charset + * @return the charset + */ + @JsonProperty("charset") + public String getCharset() { + return charset; + } + + /** + * set charset + * @param charset the charset to set + */ + public void setCharset(String charset) { + this.charset = charset; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java new file mode 100644 index 0000000000..a02b17f78a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.Optional; + +/** + * SourceInfo + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({ + @Type(value = CsvSourceInfo.class, name = SourceInfo.CSV), + @Type(value = KvSourceInfo.class, name = SourceInfo.KV), + @Type(value = PbSourceInfo.class, name = SourceInfo.PB), + @Type(value = JsonSourceInfo.class, name = SourceInfo.JSON), +}) +public abstract class SourceInfo { + + public static final String NODE_PATH_SEPARTOR = "."; + + public static final String CSV = "csv"; + public static final String KV = "kv"; + public static final String PB = "pb"; + public static final String JSON = "json"; + + private String charset; + + public SourceInfo( + @JsonProperty("charset") String charset) { + this.charset = Optional.ofNullable(charset).orElse("UTF-8"); + } + + /** + * get charset + * @return the charset + */ + public String getCharset() { + return charset; + } + + /** + * set charset + * @param charset the charset to set + */ + public void setCharset(String charset) { + this.charset = charset; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java new file mode 100644 index 0000000000..ff1ac958fc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * TransformConfig + */ +public class TransformConfig { + + @JsonProperty("sourceInfo") + private SourceInfo sourceInfo; + @JsonProperty("sinkInfo") + private SinkInfo sinkInfo; + @JsonProperty("transformSql") + private String transformSql; + + @JsonCreator + public TransformConfig( + @JsonProperty("sourceInfo") SourceInfo sourceInfo, + @JsonProperty("sinkInfo") SinkInfo sinkInfo, + @JsonProperty("transformSql") String transformSql) { + this.sourceInfo = sourceInfo; + this.sinkInfo = sinkInfo; + this.transformSql = transformSql; + } + + /** + * get sourceInfo + * @return the sourceInfo + */ + @JsonProperty("sourceInfo") + public SourceInfo getSourceInfo() { + return sourceInfo; + } + + /** + * set sourceInfo + * @param sourceInfo the sourceInfo to set + */ + public void setSourceInfo(SourceInfo sourceInfo) { + this.sourceInfo = sourceInfo; + } + + /** + * get sinkInfo + * @return the sinkInfo + */ + @JsonProperty("sinkInfo") + public SinkInfo getSinkInfo() { + return sinkInfo; + } + + /** + * set sinkInfo + * @param sinkInfo the sinkInfo to set + */ + public void setSinkInfo(SinkInfo sinkInfo) { + this.sinkInfo = sinkInfo; + } + + /** + * get transformSql + * @return the transformSql + */ + @JsonProperty("transformSql") + public String getTransformSql() { + return transformSql; + } + + /** + * set transformSql + * @param transformSql the transformSql to set + */ + public void setTransformSql(String transformSql) { + this.transformSql = transformSql; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java new file mode 100644 index 0000000000..c08a6b3bc5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder; +import org.apache.inlong.sdk.transform.decode.KvSourceDecoder; +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.decode.SourceDecoder; +import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder; +import org.apache.inlong.sdk.transform.encode.DefaultSinkData; +import org.apache.inlong.sdk.transform.encode.KvSinkEncoder; +import org.apache.inlong.sdk.transform.encode.SinkData; +import org.apache.inlong.sdk.transform.encode.SinkEncoder; +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.SinkInfo; +import org.apache.inlong.sdk.transform.pojo.SourceInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserManager; +import net.sf.jsqlparser.statement.select.PlainSelect; +import net.sf.jsqlparser.statement.select.Select; +import net.sf.jsqlparser.statement.select.SelectExpressionItem; +import net.sf.jsqlparser.statement.select.SelectItem; +import org.apache.commons.lang3.StringUtils; + +import java.io.StringReader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * TransformProcessor + * + */ +public class TransformProcessor { + + private TransformConfig config; + private SourceDecoder decoder; + private SinkEncoder encoder; + private Charset srcCharset = Charset.defaultCharset(); + protected Charset sinkCharset = Charset.defaultCharset(); + + private PlainSelect transformSelect; + private ExpressionOperator where; + private Map<String, ValueParser> selectItemMap; + + private ObjectMapper objectMapper = new ObjectMapper(); + + public TransformProcessor(String configString) + throws JsonMappingException, JsonProcessingException, JSQLParserException { + TransformConfig config = this.objectMapper.readValue(configString, TransformConfig.class); + this.init(config); + } + + public TransformProcessor(TransformConfig config) throws JSQLParserException { + this.init(config); + } + + private void init(TransformConfig config) throws JSQLParserException { + this.config = config; + if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) { + this.srcCharset = Charset.forName(config.getSourceInfo().getCharset()); + } + if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) { + this.sinkCharset = Charset.forName(config.getSinkInfo().getCharset()); + } + this.initDecoder(config); + this.initEncoder(config); + this.initTransformSql(); + } + + private void initDecoder(TransformConfig config) { + SourceInfo sourceInfo = config.getSourceInfo(); + if (sourceInfo instanceof CsvSourceInfo) { + this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo); + } else if (sourceInfo instanceof KvSourceInfo) { + this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo); + } + } + + private void initEncoder(TransformConfig config) { + SinkInfo sinkInfo = config.getSinkInfo(); + if (sinkInfo instanceof CsvSinkInfo) { + this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo); + } else if (sinkInfo instanceof KvSinkInfo) { + this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo); + } + } + + private void initTransformSql() throws JSQLParserException { + CCJSqlParserManager parserManager = new CCJSqlParserManager(); + Select select = (Select) parserManager.parse(new StringReader(config.getTransformSql())); + this.transformSelect = (PlainSelect) select.getSelectBody(); + this.where = OperatorTools.buildOperator(this.transformSelect.getWhere()); + List<SelectItem> items = this.transformSelect.getSelectItems(); + this.selectItemMap = new HashMap<>(items.size()); + for (SelectItem item : items) { + if (item instanceof SelectExpressionItem) { + SelectExpressionItem exprItem = (SelectExpressionItem) item; + if (exprItem.getAlias() == null) { + this.selectItemMap.put(exprItem.toString(), + OperatorTools.buildParser(exprItem.getExpression())); + } else { + this.selectItemMap.put(exprItem.getAlias().getName(), + OperatorTools.buildParser(exprItem.getExpression())); + } + } + } + } + + public List<String> transform(byte[] srcBytes, Map<String, Object> extParams) { + SourceData sourceData = this.decoder.decode(srcBytes, extParams); + List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount()); + for (int i = 1; i <= sourceData.getRowCount(); i++) { + if (this.where != null && !this.where.check(sourceData, i)) { + continue; + } + SinkData sinkData = new DefaultSinkData(); + for (Entry<String, ValueParser> entry : this.selectItemMap.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue().parse(sourceData, i); + sinkData.putField(fieldName, String.valueOf(fieldValue)); + } + sinkDatas.add(this.encoder.encode(sinkData)); + } + return sinkDatas; + } + + public List<String> transform(String srcString, Map<String, Object> extParams) { + return this.transform(srcString.getBytes(this.srcCharset), extParams); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java new file mode 100644 index 0000000000..c6464f850d --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.operators.conditional.AndExpression; + +/** + * AndOperator + * + */ +public class AndOperator implements ExpressionOperator { + + private ExpressionOperator left; + private ExpressionOperator right; + + public AndOperator(AndExpression expr) { + this.left = OperatorTools.buildOperator(expr.getLeftExpression()); + this.right = OperatorTools.buildOperator(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return left.check(sourceData, rowIndex) && right.check(sourceData, rowIndex); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java new file mode 100644 index 0000000000..3172626000 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.EqualsTo; +import org.apache.commons.lang.ObjectUtils; + +/** + * EqualsToOperator + * + */ +public class EqualsToOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public EqualsToOperator(EqualsTo expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java new file mode 100644 index 0000000000..b055e841e2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +/** + * ExpressionOperator + */ +public interface ExpressionOperator { + + boolean check(SourceData sourceData, int rowIndex); +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java new file mode 100644 index 0000000000..07da9d79c2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; +import org.apache.commons.lang.ObjectUtils; + +/** + * GreaterThanEqualsOperator + * + */ +public class GreaterThanEqualsOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public GreaterThanEqualsOperator(GreaterThanEquals expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @SuppressWarnings("rawtypes") + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) >= 0; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java new file mode 100644 index 0000000000..3b2158d96b --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.GreaterThan; +import org.apache.commons.lang.ObjectUtils; + +/** + * GreaterThanOperator + * + */ +public class GreaterThanOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public GreaterThanOperator(GreaterThan expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @SuppressWarnings("rawtypes") + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) > 0; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java new file mode 100644 index 0000000000..fec4ed8019 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; +import org.apache.commons.lang.ObjectUtils; + +/** + * MinorThanEqualsOperator + * + */ +public class MinorThanEqualsOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public MinorThanEqualsOperator(MinorThanEquals expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @SuppressWarnings("rawtypes") + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) <= 0; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java new file mode 100644 index 0000000000..5d9db7dd9c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.MinorThan; +import org.apache.commons.lang.ObjectUtils; + +/** + * MinorThanOperator + * + */ +public class MinorThanOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public MinorThanOperator(MinorThan expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @SuppressWarnings("rawtypes") + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) < 0; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java new file mode 100644 index 0000000000..9c58e70476 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; +import org.apache.commons.lang.ObjectUtils; + +/** + * NotEqualsToOperator + * + */ +public class NotEqualsToOperator implements ExpressionOperator { + + private ValueParser left; + private ValueParser right; + + public NotEqualsToOperator(NotEqualsTo expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java new file mode 100644 index 0000000000..f648d426e7 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.NotExpression; + +/** + * NotOperator + * + */ +public class NotOperator implements ExpressionOperator { + + private ExpressionOperator node; + + public NotOperator(NotExpression expr) { + this.node = OperatorTools.buildOperator(expr.getExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return !this.node.check(sourceData, rowIndex); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java new file mode 100644 index 0000000000..c0e059f266 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.LongParser; +import org.apache.inlong.sdk.transform.process.parser.StringParser; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.LongValue; +import net.sf.jsqlparser.expression.NotExpression; +import net.sf.jsqlparser.expression.Parenthesis; +import net.sf.jsqlparser.expression.StringValue; +import net.sf.jsqlparser.expression.operators.conditional.AndExpression; +import net.sf.jsqlparser.expression.operators.conditional.OrExpression; +import net.sf.jsqlparser.expression.operators.relational.EqualsTo; +import net.sf.jsqlparser.expression.operators.relational.GreaterThan; +import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; +import net.sf.jsqlparser.expression.operators.relational.MinorThan; +import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; +import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; +import net.sf.jsqlparser.schema.Column; + +/** + * OperatorTools + * + */ +public class OperatorTools { + + public static ExpressionOperator buildOperator(Expression expr) { + if (expr instanceof AndExpression) { + return new AndOperator((AndExpression) expr); + } else if (expr instanceof OrExpression) { + return new OrOperator((OrExpression) expr); + } else if (expr instanceof Parenthesis) { + return new ParenthesisOperator((Parenthesis) expr); + } else if (expr instanceof NotExpression) { + return new NotOperator((NotExpression) expr); + } else if (expr instanceof EqualsTo) { + return new EqualsToOperator((EqualsTo) expr); + } else if (expr instanceof NotEqualsTo) { + return new NotEqualsToOperator((NotEqualsTo) expr); + } else if (expr instanceof GreaterThan) { + return new GreaterThanOperator((GreaterThan) expr); + } else if (expr instanceof GreaterThanEquals) { + return new GreaterThanEqualsOperator((GreaterThanEquals) expr); + } else if (expr instanceof MinorThan) { + return new MinorThanOperator((MinorThan) expr); + } else if (expr instanceof MinorThanEquals) { + return new MinorThanEqualsOperator((MinorThanEquals) expr); + } + return null; + } + + public static ValueParser buildParser(Expression expr) { + if (expr instanceof Column) { + return new ColumnParser((Column) expr); + } else if (expr instanceof StringValue) { + return new StringParser((StringValue) expr); + } else if (expr instanceof LongValue) { + return new LongParser((LongValue) expr); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java new file mode 100644 index 0000000000..33b9f82bdc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.operators.conditional.OrExpression; + +/** + * OrOperator + * + */ +public class OrOperator implements ExpressionOperator { + + private ExpressionOperator left; + private ExpressionOperator right; + + public OrOperator(OrExpression expr) { + this.left = OperatorTools.buildOperator(expr.getLeftExpression()); + this.right = OperatorTools.buildOperator(expr.getRightExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return left.check(sourceData, rowIndex) || right.check(sourceData, rowIndex); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java new file mode 100644 index 0000000000..111f6bbb21 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.operator; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.Parenthesis; + +/** + * ParenthesisOperator + * + */ +public class ParenthesisOperator implements ExpressionOperator { + + private ExpressionOperator node; + + public ParenthesisOperator(Parenthesis expr) { + this.node = OperatorTools.buildOperator(expr.getExpression()); + } + + /** + * check + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public boolean check(SourceData sourceData, int rowIndex) { + return this.node.check(sourceData, rowIndex); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java new file mode 100644 index 0000000000..b6dd579d2a --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.schema.Column; + +/** + * ColumnParser + * + */ +public class ColumnParser implements ValueParser { + + private String fieldName; + + public ColumnParser(Column expr) { + this.fieldName = expr.getColumnName(); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + return sourceData.getField(rowIndex, fieldName); + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java new file mode 100644 index 0000000000..efd61cc2cb --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.LongValue; + +/** + * LongParser + * + */ +public class LongParser implements ValueParser { + + private Long value; + + public LongParser(LongValue expr) { + this.value = expr.getValue(); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + return value; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java new file mode 100644 index 0000000000..9cb431c1fa --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +import net.sf.jsqlparser.expression.StringValue; + +/** + * StringParser + * + */ +public class StringParser implements ValueParser { + + private String stringValue; + + public StringParser(StringValue expr) { + this.stringValue = expr.getValue(); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + return stringValue; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java new file mode 100644 index 0000000000..bafafe276c --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; + +/** + * ValueParser + * + */ +public interface ValueParser { + + Object parse(SourceData sourceData, int rowIndex); +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java new file mode 100644 index 0000000000..3e36bfda26 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.pojo; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * TestTransformConfig + * + */ +public class TestTransformConfig { + + @Test + public void testCsv() { + try { + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + List<FieldInfo> fields = new ArrayList<>(); + fields.add(ftime); + SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select ftime from source"; + TransformConfig config = new TransformConfig(csvSource, csvSink, transformSql); + ObjectMapper objectMapper = new ObjectMapper(); + String configString = objectMapper.writeValueAsString(config); + System.out.println(configString); + Assert.assertEquals(configString, "{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\"," + + "\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]}," + + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\"," + + "\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]}," + + "\"transformSql\":\"select ftime from source\"}"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testKv() { + try { + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + List<FieldInfo> fields = new ArrayList<>(); + fields.add(ftime); + SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); + String transformSql = "select ftime from source"; + TransformConfig config = new TransformConfig(kvSource, kvSink, transformSql); + ObjectMapper objectMapper = new ObjectMapper(); + String configString = objectMapper.writeValueAsString(config); + System.out.println(configString); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testPb() { + try { + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + List<FieldInfo> fields = new ArrayList<>(); + fields.add(ftime); + SourceInfo pbSource = new PbSourceInfo("UTF-8", "syntax = \"proto3\";", "root"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select ftime from source"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + ObjectMapper objectMapper = new ObjectMapper(); + String configString = objectMapper.writeValueAsString(config); + System.out.println(configString); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testJson() { + try { + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + List<FieldInfo> fields = new ArrayList<>(); + fields.add(ftime); + SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root"); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select ftime from source"; + TransformConfig config = new TransformConfig(jsonSource, csvSink, transformSql); + ObjectMapper objectMapper = new ObjectMapper(); + String configString = objectMapper.writeValueAsString(config); + System.out.println(configString); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java new file mode 100644 index 0000000000..9c75456df6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.KvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.SinkInfo; +import org.apache.inlong.sdk.transform.pojo.SourceInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * TestTransformProcessor + * + */ +public class TestTransformProcessor { + + @Test + public void testCsv2Kv() { + try { + List<FieldInfo> fields = new ArrayList<>(); + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + fields.add(ftime); + FieldInfo extinfo = new FieldInfo(); + extinfo.setName("extinfo"); + fields.add(extinfo); + SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields); + SinkInfo kvSink = new KvSinkInfo("UTF-8", fields); + String transformSql = "select ftime,extinfo from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "ftime=2024-04-28 00:00:00&extinfo=ok"); + // case2 + config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testKvCsv() { + try { + List<FieldInfo> fields = new ArrayList<>(); + FieldInfo ftime = new FieldInfo(); + ftime.setName("ftime"); + fields.add(ftime); + FieldInfo extinfo = new FieldInfo(); + extinfo.setName("extinfo"); + fields.add(extinfo); + SourceInfo kvSource = new KvSourceInfo("UTF-8", fields); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select ftime,extinfo from source where extinfo='ok'"; + TransformConfig config = new TransformConfig(kvSource, csvSink, transformSql); + // case1 + TransformProcessor processor1 = new TransformProcessor(config); + List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + config.setTransformSql("select ftime,extinfo from source where extinfo!='ok'"); + TransformProcessor processor2 = new TransformProcessor(config); + List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testKvCsvByJsonConfig() { + try { + String configString1 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," + + "\"escapeChar\":\"\\\\\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"transformSql\":\"select ftime,extinfo from source where extinfo='ok'\"}"; + // case1 + TransformProcessor processor1 = new TransformProcessor(configString1); + List<String> output1 = processor1.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok"); + // case2 + String configString2 = "{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\"," + + "\"escapeChar\":\"\\\\\"," + + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]}," + + "\"transformSql\":\"select ftime,extinfo from source where extinfo!='ok'\"}"; + TransformProcessor processor2 = new TransformProcessor(configString2); + List<String> output2 = processor2.transform("ftime=2024-04-28 00:00:00&extinfo=ok", new HashMap<>()); + Assert.assertTrue(output2.size() == 0); + } catch (Exception e) { + e.printStackTrace(); + } + } +}