This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37c618445a Remove JsonUtils dependency due to classpath issues (#11264)
37c618445a is described below
commit 37c618445a5ea6344ec388950ae68eec1a585238
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Aug 4 02:00:56 2023 +0530
Remove JsonUtils dependency due to classpath issues (#11264)
Co-authored-by: Kartik Khare <[email protected]>
---
.../kinesis/KinesisPartitionGroupOffset.java | 23 ++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
index 4ee247f365..12af4765fd 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java
@@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.JsonUtils;
-
/**
* A {@link StreamPartitionMsgOffset} implementation for the Kinesis partition
group consumption
@@ -40,6 +41,10 @@ import org.apache.pinot.spi.utils.JsonUtils;
* The longer the time period between write requests, the larger the sequence
numbers become.
*/
public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset {
+ private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+ public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader();
+ public static final ObjectWriter DEFAULT_WRITER = DEFAULT_MAPPER.writer();
+
private final Map<String, String> _shardToStartSequenceMap;
public KinesisPartitionGroupOffset(Map<String, String>
shardToStartSequenceMap) {
@@ -48,7 +53,7 @@ public class KinesisPartitionGroupOffset implements
StreamPartitionMsgOffset {
public KinesisPartitionGroupOffset(String offsetStr)
throws IOException {
- _shardToStartSequenceMap = JsonUtils.stringToObject(offsetStr, new
TypeReference<Map<String, String>>() {
+ _shardToStartSequenceMap = stringToObject(offsetStr, new
TypeReference<Map<String, String>>() {
});
}
@@ -59,7 +64,7 @@ public class KinesisPartitionGroupOffset implements
StreamPartitionMsgOffset {
@Override
public String toString() {
try {
- return JsonUtils.objectToString(_shardToStartSequenceMap);
+ return objectToString(_shardToStartSequenceMap);
} catch (JsonProcessingException e) {
throw new IllegalStateException(
"Caught exception when converting KinesisCheckpoint to string: " +
_shardToStartSequenceMap);
@@ -91,4 +96,14 @@ public class KinesisPartitionGroupOffset implements
StreamPartitionMsgOffset {
return _shardToStartSequenceMap.values().iterator().next()
.compareTo(other._shardToStartSequenceMap.values().iterator().next());
}
+
+ public static <T> T stringToObject(String jsonString, TypeReference<T>
valueTypeRef)
+ throws IOException {
+ return DEFAULT_READER.forType(valueTypeRef).readValue(jsonString);
+ }
+
+ public static String objectToString(Object object)
+ throws JsonProcessingException {
+ return DEFAULT_WRITER.writeValueAsString(object);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]