This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa3ea893b Remove usage of deprecated KafkaJSONMessageDecoder (#16150)
1aa3ea893b is described below
commit 1aa3ea893b6864d991066adc5d2b1dc51978269e
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jun 18 19:29:45 2025 -0600
Remove usage of deprecated KafkaJSONMessageDecoder (#16150)
---
.../Homepage/Operations/AddIngestionComponent.tsx | 2 +-
.../Operations/AddRealTimeIngestionComponent.tsx | 2 +-
.../Homepage/Operations/AddRealtimeTableOp.tsx | 2 +-
.../inputformat/json/JSONMessageDecoderTest.java} | 71 ++++++++++++----------
.../src/test/resources/data/test_sample_data.json | 0
.../test_sample_data_schema_no_time_field.json | 0
...sample_data_schema_with_outgoing_time_spec.json | 0
...ple_data_schema_without_outgoing_time_spec.json | 0
.../pinot-kafka-2.0/README.md | 8 +--
.../stream/kafka/KafkaJSONMessageDecoder.java | 3 +-
10 files changed, 46 insertions(+), 42 deletions(-)
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
index 8621c1c152..1724b7fd32 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddIngestionComponent.tsx
@@ -89,7 +89,7 @@ export default function AddIngestionComponent({
"stream.kafka.broker.list": "",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
index 0118f05748..196ef01eee 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealTimeIngestionComponent.tsx
@@ -89,7 +89,7 @@ export default function AddRealTimeIngestionComponent({
"stream.kafka.broker.list": "",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
-
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
index 8ddd36910d..e6c8c5c77d 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/AddRealtimeTableOp.tsx
@@ -102,7 +102,7 @@ const defaultTableObj = {
"stream.kafka.broker.list": "",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
- "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.segment.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
similarity index 56%
rename from
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
rename to
pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
index dc4810d611..5c2bda51b8 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONMessageDecoderTest.java
@@ -16,31 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.plugin.stream.kafka;
+package org.apache.pinot.plugin.inputformat.json;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
-public class KafkaJSONMessageDecoderTest {
+
+public class JSONMessageDecoderTest {
@Test
public void testJsonDecoderWithoutOutgoingTimeSpec()
throws Exception {
- Schema schema = Schema.fromFile(new File(
-
getClass().getClassLoader().getResource("data/test_sample_data_schema_without_outgoing_time_spec.json")
- .getFile()));
- Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+ Schema schema =
loadSchema("data/test_sample_data_schema_without_outgoing_time_spec.json");
+ Map<String, DataType> sourceFields = new HashMap<>();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
}
@@ -50,66 +53,70 @@ public class KafkaJSONMessageDecoderTest {
@Test
public void testJsonDecoderWithOutgoingTimeSpec()
throws Exception {
- Schema schema = Schema.fromFile(new File(
-
getClass().getClassLoader().getResource("data/test_sample_data_schema_with_outgoing_time_spec.json")
- .getFile()));
- Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+ Schema schema =
loadSchema("data/test_sample_data_schema_with_outgoing_time_spec.json");
+ Map<String, DataType> sourceFields = new HashMap<>();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
}
sourceFields.remove("secondsSinceEpoch");
- sourceFields.put("time_day", FieldSpec.DataType.INT);
+ sourceFields.put("time_day", DataType.INT);
testJsonDecoder(sourceFields);
}
@Test
public void testJsonDecoderNoTimeSpec()
throws Exception {
- Schema schema = Schema.fromFile(
- new
File(getClass().getClassLoader().getResource("data/test_sample_data_schema_no_time_field.json").getFile()));
- Map<String, FieldSpec.DataType> sourceFields = new HashMap<>();
+ Schema schema =
loadSchema("data/test_sample_data_schema_no_time_field.json");
+ Map<String, DataType> sourceFields = new HashMap<>();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
sourceFields.put(fieldSpec.getName(), fieldSpec.getDataType());
}
testJsonDecoder(sourceFields);
}
- private void testJsonDecoder(Map<String, FieldSpec.DataType> sourceFields)
+ private Schema loadSchema(String resourcePath)
+ throws Exception {
+ URL resource = getClass().getClassLoader().getResource(resourcePath);
+ assertNotNull(resource);
+ return Schema.fromFile(new File(resource.getFile()));
+ }
+
+ private void testJsonDecoder(Map<String, DataType> sourceFields)
throws Exception {
- try (BufferedReader reader = new BufferedReader(
- new
FileReader(getClass().getClassLoader().getResource("data/test_sample_data.json").getFile())))
{
- KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
- decoder.init(new HashMap<>(), sourceFields.keySet(), "testTopic");
- GenericRow r = new GenericRow();
- String line = reader.readLine();
- while (line != null) {
+ URL resource =
getClass().getClassLoader().getResource("data/test_sample_data.json");
+ assertNotNull(resource);
+ try (BufferedReader reader = new BufferedReader(new
FileReader(resource.getFile()))) {
+ JSONMessageDecoder decoder = new JSONMessageDecoder();
+ decoder.init(Map.of(), sourceFields.keySet(), "testTopic");
+ GenericRow row = new GenericRow();
+ String line;
+ while ((line = reader.readLine()) != null) {
JsonNode jsonNode = JsonUtils.DEFAULT_READER.readTree(line);
- decoder.decode(line.getBytes(), r);
+ decoder.decode(line.getBytes(), row);
for (String field : sourceFields.keySet()) {
- Object actualValue = r.getValue(field);
+ Object actualValue = row.getValue(field);
JsonNode expectedValue = jsonNode.get(field);
switch (sourceFields.get(field)) {
case STRING:
- Assert.assertEquals(actualValue, expectedValue.asText());
+ assertEquals(actualValue, expectedValue.asText());
break;
case INT:
- Assert.assertEquals(actualValue, expectedValue.asInt());
+ assertEquals(actualValue, expectedValue.asInt());
break;
case LONG:
- Assert.assertEquals(actualValue, expectedValue.asLong());
+ assertEquals(actualValue, expectedValue.asLong());
break;
case FLOAT:
- Assert.assertEquals(actualValue, (float)
expectedValue.asDouble());
+ assertEquals(actualValue, (float) expectedValue.asDouble());
break;
case DOUBLE:
- Assert.assertEquals(actualValue, expectedValue.asDouble());
+ assertEquals(actualValue, expectedValue.asDouble());
break;
default:
- Assert.assertTrue(false, "Shouldn't arrive here.");
+ fail("Shouldn't arrive here.");
break;
}
}
- line = reader.readLine();
}
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json
similarity index 100%
rename from
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data.json
rename to
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data.json
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json
similarity index 100%
rename from
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_no_time_field.json
rename to
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_no_time_field.json
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
similarity index 100%
rename from
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
rename to
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_with_outgoing_time_spec.json
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
b/pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
similarity index 100%
rename from
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
rename to
pinot-plugins/pinot-input-format/pinot-json/src/test/resources/data/test_sample_data_schema_without_outgoing_time_spec.json
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
index 9016804697..e32826700f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md
@@ -30,15 +30,13 @@ A stream plugin for another version of kafka, or another
stream, can be added in
```
* How to use Kafka 2.x connector
-Below is a sample `streamConfigs` used to create a realtime table with Kafka
Stream(High) level consumer:
+Below is a sample `streamConfigs` used to create a real-time table with Kafka
consumer:
```$xslt
"streamConfigs": {
"streamType": "kafka",
+ "stream.kafka.broker.list": "localhost:19092",
"stream.kafka.topic.name": "meetupRSVPEvents",
- "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
- "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
- "stream.kafka.zk.broker.url": "localhost:2191/kafka",
- "stream.kafka.hlc.bootstrap.server": "localhost:19092"
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
```
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index cf12fcc9cc..d3d55572cd 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -22,8 +22,7 @@ import
org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder;
/**
- * This class has been kept for backward compatability. Use @see
`org.apache.pinot.plugin.inputformat.json
- * .StreamJSONMessageDecoder` for future use cases.
+ * This class has been kept for backward compatibility. Use {@link
JSONMessageDecoder} for future use cases.
* This class will be removed in a later release.
*/
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]