This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 4e4e8a36e [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809) 4e4e8a36e is described below commit 4e4e8a36ea24285a9e2e5e11e759e4dca0cf0d95 Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Sat Sep 10 12:30:13 2022 +0800 [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809) --- .../sdk/sort/impl/decode/MessageDeserializer.java | 96 +++++++++++- .../apache/inlong/sdk/sort/util/StringUtil.java | 169 +++++++++++++++++++++ .../sort/impl/decode/MessageDeserializerTest.java | 43 +++++- 3 files changed, 297 insertions(+), 11 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java index 145b7ad77..b0c54663e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java @@ -23,8 +23,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs; @@ -32,12 +37,14 @@ import org.apache.inlong.sdk.sort.api.ClientContext; import org.apache.inlong.sdk.sort.api.Deserializer; import org.apache.inlong.sdk.sort.entity.InLongMessage; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.util.StringUtil; import org.apache.inlong.sdk.sort.util.Utils; public class MessageDeserializer implements Deserializer { private static final int MESSAGE_VERSION_NONE = 0; private static final int MESSAGE_VERSION_PB = 1; + private static final int MESSAGE_VERSION_INLONG_MSG = 2; private static final int COMPRESS_TYPE_NONE = 0; private static final int COMPRESS_TYPE_GZIP = 1; private static final int COMPRESS_TYPE_SNAPPY = 2; @@ -48,15 +55,29 @@ public class MessageDeserializer implements Deserializer { private static final String INLONG_GROUPID_KEY = "inlongGroupId"; private static final String INLONG_STREAMID_KEY = "inlongStreamId"; + private static final String INLONGMSG_ATTR_STREAM_ID = "streamId"; + private static final String INLONGMSG_ATTR_GROUP_ID = "groupId"; + private static final String INLONGMSG_ATTR_TIME_T = "t"; + private static final String INLONGMSG_ATTR_TIME_DT = "dt"; + private static final String INLONGMSG_ATTR_NODE_IP = "NodeIP"; + private static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&'; + private static final char INLONGMSG_ATTR_KV_DELIMITER = '='; + private static final String DEFAULT_IP = "127.0.0.1"; + + private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!"; + public MessageDeserializer() { } @Override - public List<InLongMessage> deserialize(ClientContext context, InLongTopic inLongTopic, Map<String, String> headers, + public List<InLongMessage> deserialize( + ClientContext context, + InLongTopic inLongTopic, + Map<String, String> headers, byte[] data) throws Exception { //1. version - int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, "0")); + int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, Integer.toString(MESSAGE_VERSION_INLONG_MSG))); switch (version) { case MESSAGE_VERSION_NONE: { return decode(context, inLongTopic, data, headers); @@ -64,12 +85,18 @@ public class MessageDeserializer implements Deserializer { case MESSAGE_VERSION_PB: { return decodePB(context, inLongTopic, data, headers); } + case MESSAGE_VERSION_INLONG_MSG: { + return decodeInlongMsg(context, inLongTopic, data, headers); + } default: throw new IllegalArgumentException("Unknown version type:" + version); } } - private List<InLongMessage> decode(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes, + private List<InLongMessage> decode( + ClientContext context, + InLongTopic inLongTopic, + byte[] msgBytes, Map<String, String> headers) { long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0")); String sourceIp = headers.getOrDefault(SOURCE_IP_KEY, ""); @@ -89,7 +116,10 @@ public class MessageDeserializer implements Deserializer { * @param msgBytes byte[] * @return {@link MessageObjs} */ - private List<InLongMessage> decodePB(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes, + private List<InLongMessage> decodePB( + ClientContext context, + InLongTopic inLongTopic, + byte[] msgBytes, Map<String, String> headers) throws IOException { int compressType = Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0")); String inlongGroupId = headers.getOrDefault(INLONG_GROUPID_KEY, ""); @@ -120,7 +150,8 @@ public class MessageDeserializer implements Deserializer { * @param messageObjs {@link MessageObjs} * @return {@link List} */ - private List<InLongMessage> transformMessageObjs(ClientContext context, InLongTopic inLongTopic, + private List<InLongMessage> transformMessageObjs( + ClientContext context, InLongTopic inLongTopic, MessageObjs messageObjs, String inlongGroupId, String inlongStreamId) { if (null == messageObjs) { @@ -144,4 +175,59 @@ public class MessageDeserializer implements Deserializer { } return inLongMessages; } + + private List<InLongMessage> decodeInlongMsg( + ClientContext context, + InLongTopic inLongTopic, + byte[] msgBytes, + Map<String, String> headers) { + List<InLongMessage> messageList = new ArrayList<>(); + + InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes); + for (String attr : inLongMsg.getAttrs()) { + Map<String, String> attributes = StringUtil.splitKv(attr, INLONGMSG_ATTR_ENTRY_DELIMITER, + INLONGMSG_ATTR_KV_DELIMITER, null, null); + + String groupId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID)) + .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, + INLONGMSG_ATTR_GROUP_ID))); + + String streamId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID)) + .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, + INLONGMSG_ATTR_STREAM_ID))); + + // Extracts time from the attributes + long msgTime; + if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) { + String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim(); + msgTime = StringUtil.parseDateTime(date); + } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) { + String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim(); + msgTime = Long.parseLong(epoch); + } else { + throw new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING, + INLONGMSG_ATTR_TIME_T + " or " + INLONGMSG_ATTR_TIME_DT)); + } + + String srcIp = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_NODE_IP)) + .orElse(DEFAULT_IP); + + Iterator<byte[]> iterator = inLongMsg.getIterator(attr); + while (iterator.hasNext()) { + byte[] bodyBytes = iterator.next(); + if (Objects.isNull(bodyBytes)) { + continue; + } + InLongMessage inLongMessage = new InLongMessage(groupId, streamId, msgTime, + srcIp, bodyBytes, attributes); + messageList.add(inLongMessage); + context.getStatManager() + .getStatistics(context.getConfig().getSortTaskId(), + inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) + .addDecompressionConsumeSize(inLongMessage.getBody().length); + } + } + return messageList; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java index 6bacc8c22..2a8774255 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java @@ -17,12 +17,151 @@ package org.apache.inlong.sdk.sort.util; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.HashMap; +import java.util.Map; public class StringUtil { + private static final int STATE_NORMAL = 0; + private static final int STATE_KEY = 2; + private static final int STATE_VALUE = 4; + private static final int STATE_ESCAPING = 8; + private static final int STATE_QUOTING = 16; + + /** + * Splits the kv text. + * + * <p>Both escaping and quoting is supported. When the escape character is + * not '\0', then the next character to the escape character will be + * escaped. When the quote character is not '\0', then all characters + * between consecutive quote characters will be escaped.</p> + * + * @param text The text to be split. + * @param entryDelimiter The delimiter of entries. + * @param kvDelimiter The delimiter between key and value. + * @param escapeChar The escaping character. Only valid if not '\0'. + * @param quoteChar The quoting character. + * @return The fields split from the text. + */ + @SuppressWarnings("checkstyle:MissingSwitchDefault") + public static Map<String, String> splitKv( + @Nonnull String text, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar + ) { + Map<String, String> fields = new HashMap<>(); + + StringBuilder stringBuilder = new StringBuilder(); + + String key = ""; + String value; + + int state = STATE_KEY; + + /* + * The state when entering escaping and quoting. When we exit escaping + * or quoting, we should restore this state. + */ + int kvState = STATE_KEY; + + for (int i = 0; i < text.length(); ++i) { + char ch = text.charAt(i); + + if (ch == kvDelimiter) { + switch (state) { + case STATE_KEY: + key = stringBuilder.toString(); + stringBuilder.setLength(0); + state = STATE_VALUE; + break; + case STATE_VALUE: + throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + "."); + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + } + } else if (ch == entryDelimiter) { + switch (state) { + case STATE_KEY: + throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + "."); + case STATE_VALUE: + value = stringBuilder.toString(); + fields.put(key, value); + + stringBuilder.setLength(0); + state = STATE_KEY; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + } + } else if (escapeChar != null && ch == escapeChar) { + switch (state) { + case STATE_KEY: + case STATE_VALUE: + kvState = state; + state = STATE_ESCAPING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + stringBuilder.append(ch); + break; + } + } else if (quoteChar != null && ch == quoteChar) { + switch (state) { + case STATE_KEY: + case STATE_VALUE: + kvState = state; + state = STATE_QUOTING; + break; + case STATE_ESCAPING: + stringBuilder.append(ch); + state = kvState; + break; + case STATE_QUOTING: + state = kvState; + break; + } + } else { + stringBuilder.append(ch); + } + } + + switch (state) { + case STATE_KEY: + throw new IllegalArgumentException("Dangling key."); + case STATE_VALUE: + value = stringBuilder.toString(); + fields.put(key, value); + return fields; + case STATE_ESCAPING: + throw new IllegalArgumentException("Not closed escaping."); + case STATE_QUOTING: + throw new IllegalArgumentException("Not closed quoting."); + default: + throw new IllegalStateException(); + } + } + /** * formatDate * @@ -106,4 +245,34 @@ public class StringUtil { return sdf.format(date); } + /** + * Parse date time form string format to unix format. + * Only support <b>yyyyMMdd</b>, <b>yyyyMMddHH</b> and <b>yyyyMMddHHmm</b> precision + * whose length is 8, 10 and 12 respectively. + * + * @param value Date time in string format. + * @return Unix date time. + */ + public static long parseDateTime(String value) { + try { + if (value.length() < 8) { + return -1; + } else if (value.length() <= 9) { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd"); + Date date = simpleDateFormat.parse(value.substring(0, 8)); + return new Timestamp(date.getTime()).getTime(); + } else if (value.length() <= 11) { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHH"); + Date date = simpleDateFormat.parse(value.substring(0, 10)); + return new Timestamp(date.getTime()).getTime(); + } else { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm"); + Date date = simpleDateFormat.parse(value.substring(0, 12)); + return new Timestamp(date.getTime()).getTime(); + } + } catch (ParseException e) { + throw new IllegalArgumentException("Unexpected time format : " + value); + } + } + } diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java index 04307336a..815f1047c 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java @@ -24,6 +24,7 @@ import com.google.protobuf.ByteString; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs; @@ -79,24 +80,27 @@ public class MessageDeserializerTest { @Test public void testDeserialize() { - //1. setUp + // 1. setUp try { setUp(); } catch (Exception e) { e.printStackTrace(); } - //2. testDeserializeVersion0 + // 2. testDeserializeVersion0 testDeserializeVersion0(); - //3. testDeserializeVersion1CompressionType0 + // 3. testDeserializeVersion1CompressionType0 testDeserializeVersion1CompressionType0(); - //4. testDeserializeVersion1CompressionType1 + // 4. testDeserializeVersion1CompressionType1 testDeserializeVersion1CompressionType1(); - //5. testDeserializeVersion1CompressionType2 + // 5. testDeserializeVersion1CompressionType2 testDeserializeVersion1CompressionType2(); + + // 6. DeserializeVersion2NoCompress + testDeserializeVersion2NoCompress(); } private void testDeserializeVersion0() { @@ -165,6 +169,33 @@ public class MessageDeserializerTest { } } + private void testDeserializeVersion2NoCompress() { + try { + String groupId = "sort_sdk_test_group_id"; + String streamId = "sort_sdk_test_stream_id"; + String attr = "m=0"; + String ip = "1.2.3.4"; + long dt = System.currentTimeMillis(); + StringBuilder newAttrBuilder = new StringBuilder(attr); + newAttrBuilder.append("&groupId=").append(groupId).append("&streamId=").append(streamId) + .append("&dt=").append(dt).append("&NodeIP=").append(ip); + InLongMsg inlongMsg = InLongMsg.newInLongMsg(false); + String msg = "sort sdk inlong msg test"; + for (int i = 0; i < 10; i++) { + byte[] bytes = msg.getBytes(); + inlongMsg.addMsg(newAttrBuilder.toString(), bytes); + } + Map<String, String> header = new HashMap<>(); + header.put("version", "2"); + List<InLongMessage> deserialize = messageDeserializer + .deserialize(context, inLongTopic, header, inlongMsg.buildArray()); + Assert.assertEquals(10, deserialize.size()); + Assert.assertEquals(msg, new String(deserialize.get(0).getBody())); + } catch (Throwable t) { + t.printStackTrace(); + } + } + private void prepareTestMessageObjs() { headers.put("version", "1"); testData = "test data"; @@ -183,4 +214,4 @@ public class MessageDeserializerTest { messageObjs = MessageObjs.newBuilder().addMsgs(messageObj1).addMsgs(messageObj2).build(); } -} \ No newline at end of file +}