healchow commented on code in PR #5809:
URL: https://github.com/apache/inlong/pull/5809#discussion_r966591565


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java:
##########
@@ -144,4 +169,60 @@ private List<InLongMessage> 
transformMessageObjs(ClientContext context, InLongTo
         }
         return inLongMessages;
     }
+
+    private List<InLongMessage> decodeInlongMsg(
+            ClientContext context,
+            InLongTopic inLongTopic,
+            byte[] msgBytes,
+            Map<String, String> headers) {
+        List<InLongMessage> messageList = new ArrayList<>();
+
+        InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
+        for (String attr : inLongMsg.getAttrs()) {
+            Map<String, String> attributes = StringUtil.splitKv(attr, 
INLONGMSG_ATTR_ENTRY_DELIMITER,
+                    INLONGMSG_ATTR_KV_DELIMITER, null, null);
+
+            String groupId = 
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID))
+                    .orElseThrow(() -> new IllegalArgumentException("Could not 
find "
+                            + INLONGMSG_ATTR_GROUP_ID + " in attributes!"));
+
+            String streamId = 
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID))
+                    .orElseThrow(() -> new IllegalArgumentException("Could not 
find "
+                            + INLONGMSG_ATTR_STREAM_ID + " in attributes!"));
+
+            // Extracts time from the attributes
+            long msgTime;
+            if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+                String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+                msgTime = StringUtil.parseDateTime(date);
+            } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+                String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+                msgTime = Long.parseLong(epoch);
+            } else {
+                throw new IllegalArgumentException(
+                        "Could not find " + INLONGMSG_ATTR_TIME_T
+                                + " or " + INLONGMSG_ATTR_TIME_DT + " in 
attributes!");
+            }
+
+            String srcIp = 
Optional.ofNullable(attributes.get(INLONGMSG_ATTR_NODE_IP))
+                    .orElse("127.0.0.1");

Review Comment:
   Suggested using a default constant for `127.0.0.1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to