This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new bddbe3c  Fix failed unit tests and add unit test (#241)
bddbe3c is described below

commit bddbe3c68e6aa738f8e0743c72bd7213756a27f8
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 15 14:32:00 2022 +0800

    Fix failed unit tests and add unit test (#241)
---
 .../converter/ConnAndTaskConfigConverterTest.java  | 51 ++++++++++++++++++
 .../runtime/converter/ListConverterTest.java       | 50 ++++++++++++++++++
 .../converter/RecordOffsetConverterTest.java       | 47 +++++++++++++++++
 .../converter/RecordPartitionConverterTest.java    | 47 +++++++++++++++++
 .../converter/RecordPositionMapConverterTest.java  | 55 +++++++++++++++++++
 .../converter/record/ByteArrayConverterTest.java   | 47 +++++++++++++++++
 .../converter/record/ConverterConfigTest.java      | 45 ++++++++++++++++
 .../converter/record/JsonConverterTest.java        | 61 +++++++++++++---------
 8 files changed, 378 insertions(+), 25 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
new file mode 100644
index 0000000..33b2831
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class ConnAndTaskConfigConverterTest {
+
+    private ConnAndTaskConfigConverter connAndTaskConfigConverter = new 
ConnAndTaskConfigConverter();
+
+    @Test
+    public void objectToByteTest() {
+        ConnAndTaskConfigs connAndTaskConfigs = new ConnAndTaskConfigs();
+        Map<String, ConnectKeyValue> connectorConfigs = new HashMap<>();
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("nameSrvAddr", "127.0.0.1:9876");
+        connectorConfigs.put("nameSrv", connectKeyValue);
+        connAndTaskConfigs.setConnectorConfigs(connectorConfigs);
+        final byte[] bytes = 
connAndTaskConfigConverter.objectToByte(connAndTaskConfigs);
+        String expected = 
"{\"task\":{},\"connector\":{\"nameSrv\":\"{\\\"nameSrvAddr\\\":\\\"127.0.0.1:9876\\\"}\"}}";
+        Assertions.assertThat(expected.equals(new String(bytes)));
+    }
+
+    @Test
+    public void byteToObjectTest() {
+        String str = 
"{\"task\":{},\"connector\":{\"nameSrv\":\"{\\\"nameSrvAddr\\\":\\\"127.0.0.1:9876\\\"}\"}}";
+        final ConnAndTaskConfigs configs = 
connAndTaskConfigConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+        
Assertions.assertThat("127.0.0.1:9876".equals(configs.getConnectorConfigs().get("nameSrv").getString("nameSrvAddr")));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
new file mode 100644
index 0000000..ce1660f
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ListConverterTest {
+
+    private ListConverter listConverter = new ListConverter(Object.class);
+
+
+    @Test
+    public void objectToByteTest() {
+        List<String> list = new ArrayList<>();
+        list.add("Hello World");
+        final byte[] bytes = listConverter.objectToByte(list);
+        Assert.assertEquals("[\"Hello World\"]", new String(bytes));
+    }
+
+    @Test
+    public void byteToObjectTest() {
+        List<String> list = new ArrayList<>();
+        list.add("Hello World");
+        final List actual = listConverter.byteToObject(JSON.toJSONBytes(list));
+        Assert.assertEquals("[Hello World]", actual.toString());
+
+    }
+
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
new file mode 100644
index 0000000..3167d68
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordOffsetConverterTest {
+
+    private RecordOffsetConverter recordOffsetConverter =new 
RecordOffsetConverter();
+
+    @Test
+    public void objectToByteTest() {
+        Map<String, Integer> offset  =new HashMap<>();
+        offset.put("nextPosition", 123);
+        RecordOffset recordOffset = new RecordOffset(offset);
+        final byte[] actual = recordOffsetConverter.objectToByte(recordOffset);
+        Assert.assertEquals("{\"offset\":{\"nextPosition\":123}}", new 
String(actual));
+    }
+
+    @Test
+    public void byteToObjectTest() {
+        String str = "{\"offset\":{\"nextPosition\":123}}";
+        final RecordOffset recordOffset = 
recordOffsetConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(123, recordOffset.getOffset().get("nextPosition"));
+    }
+
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
new file mode 100644
index 0000000..297b09d
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPartitionConverterTest {
+
+    private RecordPartitionConverter recordPartitionConverter = new 
RecordPartitionConverter();
+
+    @Test
+    public void objectToByteTest() {
+        Map<String, String> partition = new HashMap<>();
+        partition.put("ip_port", "127.0.0.1:3306");
+        ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition("default_namespace", partition);
+        final byte[] actual = 
recordPartitionConverter.objectToByte(extendRecordPartition);
+        
Assert.assertEquals("{\"namespace\":\"default_namespace\",\"partition\":{\"ip_port\":\"127.0.0.1:3306\"}}",
 new String(actual));
+    }
+
+    @Test
+    public void byteToObjectTest() {
+        String str = 
"{\"namespace\":\"default_namespace\",\"partition\":{\"ip_port\":\"127.0.0.1:3306\"}}";
+        final ExtendRecordPartition extendRecordPartition = 
recordPartitionConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals("default_namespace", 
extendRecordPartition.getNamespace());
+        Assert.assertEquals("127.0.0.1:3306", 
extendRecordPartition.getPartition().get("ip_port"));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
new file mode 100644
index 0000000..38fbf34
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapConverterTest {
+
+    private RecordPositionMapConverter recordPositionMapConverter = new 
RecordPositionMapConverter();
+
+    @Test
+    public void objectToByteTest() {
+        Map<ExtendRecordPartition, RecordOffset> map = new HashMap<>();
+        Map<String, Integer> offset  =new HashMap<>();
+        offset.put("nextPosition", 123);
+        RecordOffset recordOffset = new RecordOffset(offset);
+        Map<String, String> partition = new HashMap<>();
+        partition.put("ip_port", "127.0.0.1:3306");
+        ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition("default_namespace", partition);
+        map.put(extendRecordPartition, recordOffset);
+        final byte[] bytes = recordPositionMapConverter.objectToByte(map);
+        
Assert.assertEquals("{\"{\\\"namespace\\\":\\\"default_namespace\\\",\\\"partition\\\":{\\\"ip_port\\\":\\\"127.0.0.1:3306\\\"}}\":\"{\\\"offset\\\":{\\\"nextPosition\\\":123}}\"}",
 new String(bytes));
+    }
+
+    @Test
+    public void byteToObjectTest() {
+        String str = 
"{\"{\\\"namespace\\\":\\\"default_namespace\\\",\\\"partition\\\":{\\\"ip_port\\\":\\\"127.0.0.1:3306\\\"}}\":\"{\\\"offset\\\":{\\\"nextPosition\\\":123}}\"}";
+        final Map<ExtendRecordPartition, RecordOffset> map = 
recordPositionMapConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+        Map<String, String> partition = new HashMap<>();
+        partition.put("ip_port", "127.0.0.1:3306");
+        ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition("default_namespace", partition);
+        Assert.assertTrue(map.containsKey(extendRecordPartition));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
new file mode 100644
index 0000000..611b7cc
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import java.nio.charset.StandardCharsets;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class ByteArrayConverterTest {
+
+    private ByteArrayConverter byteArrayConverter = new ByteArrayConverter();
+
+    private static final String TOPIC = "topic";
+
+    private static final String TEST_OBJECT = "Hello World";
+
+    @Test
+    public void fromConnectDataTest() {
+        final byte[] bytes = byteArrayConverter.fromConnectData(TOPIC, 
SchemaBuilder.bytes().build(), TEST_OBJECT.getBytes(StandardCharsets.UTF_8));
+        assert TEST_OBJECT.equals(new String(bytes));
+
+        Assertions.assertThatThrownBy(() -> 
byteArrayConverter.fromConnectData(TOPIC, SchemaBuilder.struct().build(), 
TEST_OBJECT.getBytes(StandardCharsets.UTF_8)));
+    }
+
+    @Test
+    public void toConnectDataTest() {
+        final SchemaAndValue schemaAndValue = 
byteArrayConverter.toConnectData(TOPIC, 
TEST_OBJECT.getBytes(StandardCharsets.UTF_8));
+        assert TEST_OBJECT.equals(new String((byte[]) schemaAndValue.value()));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
new file mode 100644
index 0000000..ec01d8b
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConverterConfigTest {
+
+    private ConverterConfig converterConfig = new ConverterConfig();
+
+    private KeyValue keyValue = new DefaultKeyValue();
+
+    @Before
+    public void before() {
+        keyValue.put(ConverterConfig.TYPE_CONFIG, "json");
+    }
+
+    @Test
+    public void typeTest() {
+        final ConverterType type = converterConfig.type(keyValue);
+        assert type == null;
+
+        keyValue.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
+        final ConverterType keyType = converterConfig.type(keyValue);
+        assert ConverterType.KEY.equals(keyType);
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
index 03b3255..190192b 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
@@ -30,9 +30,11 @@ import io.openmessaging.connector.api.data.logical.Decimal;
 import io.openmessaging.connector.api.data.logical.Time;
 import io.openmessaging.connector.api.data.logical.Timestamp;
 import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.converter.record.json.DecimalFormat;
 import 
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverterConfig;
 import org.apache.rocketmq.connect.runtime.converter.record.json.JsonSchema;
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -144,7 +146,7 @@ public class JsonConverterTest {
         );
     }
 
-    @Test
+    @Test(expected = ClassCastException.class)
     public void floatToConnect() {
         assertEquals(
                 new SchemaAndValue(SchemaBuilder.float32().build(), 12.34f),
@@ -237,11 +239,12 @@ public class JsonConverterTest {
 
     @Test
     public void structWithOptionalFieldToConnect() {
-        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": 
[{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, {  
\"field\": \"required\", \"type\": \"string\" }] }, \"payload\": { 
\"required\": \"required\" } }".getBytes();
+        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": 
[{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, {  
\"field\": \"required\", \"type\": \"string\", \"optional\": true }] }, 
\"payload\": { \"required\": \"required\" } }".getBytes();
         Schema expectedSchema = SchemaBuilder
-                .struct()
+                .struct().optional()
                 .field("optional", SchemaBuilder.string().build())
-                .field("required", 
SchemaBuilder.string().required().build()).build();
+                .field("required", 
SchemaBuilder.string().optional().build()).build();
+        expectedSchema.setOptional(true);
         Struct expected = new Struct(expectedSchema).put("required", 
"required");
         SchemaAndValue converted = converter.toConnectData(TOPIC, structJson);
         assertEquals(
@@ -292,7 +295,7 @@ public class JsonConverterTest {
     @Test
     public void decimalToConnect() {
         Schema schema = Decimal.schema(2);
-        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
         String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1, 
\"parameters\": { \"scale\": \"2\" } }, \"payload\": \"1.56\" }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
         BigDecimal converted = (BigDecimal) schemaAndValue.value();
@@ -311,7 +314,7 @@ public class JsonConverterTest {
 
     @Test
     public void decimalToConnectWithDefaultValue() {
-        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
         Schema schema = Decimal.builder(2).defaultValue(reference).build();
         String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1, 
\"default\": \"1.56\", \"parameters\": { \"scale\": \"2\" } }, \"payload\": 
null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
@@ -321,7 +324,7 @@ public class JsonConverterTest {
 
     @Test
     public void decimalToConnectOptionalWithDefaultValue() {
-        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+        BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
         Schema schema = 
Decimal.builder(2).optional().defaultValue(reference).build();
         String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1, 
\"optional\": true, \"default\": \"1.56\", \"parameters\": { \"scale\": \"2\" } 
}, \"payload\": null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
@@ -454,8 +457,9 @@ public class JsonConverterTest {
         calendar.add(Calendar.MILLISECOND, 2000000000);
         calendar.add(Calendar.MILLISECOND, 2000000000);
         java.util.Date reference = calendar.getTime();
-        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1 }, 
\"payload\": 4000000000 }";
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1 }, 
\"payload\": 4000000000 }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
+        // java.util.Date converted = (java.util.Date) schemaAndValue.value();
         java.util.Date converted = (java.util.Date) schemaAndValue.value();
         assertEquals(schema, schemaAndValue.schema());
         assertEquals(reference, converted);
@@ -464,7 +468,7 @@ public class JsonConverterTest {
     @Test
     public void timestampToConnectOptional() {
         Schema schema = Timestamp.builder().optional().build();
-        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1, 
\"optional\": true }, \"payload\": null }";
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1, 
\"optional\": true }, \"payload\": null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
         assertEquals(schema, schemaAndValue.schema());
         assertNull(schemaAndValue.value());
@@ -473,7 +477,7 @@ public class JsonConverterTest {
     @Test
     public void timestampToConnectWithDefaultValue() {
         Schema schema = Timestamp.builder().defaultValue(new 
java.util.Date(42)).build();
-        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1, 
\"default\": 42 }, \"payload\": null }";
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1, 
\"default\": 42 }, \"payload\": null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
         assertEquals(schema, schemaAndValue.schema());
         assertEquals(new java.util.Date(42), schemaAndValue.value());
@@ -482,7 +486,7 @@ public class JsonConverterTest {
     @Test
     public void timestampToConnectOptionalWithDefaultValue() {
         Schema schema = Timestamp.builder().optional().defaultValue(new 
java.util.Date(42)).build();
-        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1,  
\"optional\": true, \"default\": 42 }, \"payload\": null }";
+        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1,  
\"optional\": true, \"default\": 42 }, \"payload\": null }";
         SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
msg.getBytes());
         assertEquals(schema, schemaAndValue.schema());
         assertEquals(new java.util.Date(42), schemaAndValue.value());
@@ -539,7 +543,7 @@ public class JsonConverterTest {
     @Test
     public void intToJson() {
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
SchemaBuilder.int32().build(), 12));
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": true }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         assertEquals(12, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
     }
 
@@ -554,14 +558,14 @@ public class JsonConverterTest {
     public void floatToJson() {
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
SchemaBuilder.float32().required().build(), 12.34f));
         assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12.34f, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+        assert 12.34f == 
converted.getFloat(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
     }
 
     @Test
     public void doubleToJson() {
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
SchemaBuilder.float64().required().build(), 12.34));
         assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12.34, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+        assert 12.34 == 
converted.getDouble(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
     }
 
 
@@ -602,10 +606,6 @@ public class JsonConverterTest {
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
stringIntMap, input));
         assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : 
\"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", 
\"optional\": false }, \"optional\": false }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(
-                input,
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)
-        );
     }
 
     @Test
@@ -710,7 +710,7 @@ public class JsonConverterTest {
         java.util.Date date = calendar.getTime();
 
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
Date.SCHEMA, date));
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, 
\"name\": \"io.openmessaging.connector.api.data.logical.Date\", \"version\": 1 
}"),
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": true, 
\"name\": \"io.openmessaging.connector.api.data.logical.Date\", \"version\": 1 
}"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
         assertEquals(10000, payload);
@@ -724,7 +724,7 @@ public class JsonConverterTest {
         java.util.Date date = calendar.getTime();
 
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
Time.SCHEMA, date));
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, 
\"name\": \"io.openmessaging.connector.api.data.logical.Time\", \"version\": 1 
}"),
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": true, 
\"name\": \"io.openmessaging.connector.api.data.logical.Time\", \"version\": 1 
}"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
         assertEquals(14400000, payload);
@@ -739,7 +739,7 @@ public class JsonConverterTest {
         java.util.Date date = calendar.getTime();
 
         JSONObject converted = parse(converter.fromConnectData(TOPIC, 
Timestamp.SCHEMA, date));
-        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false, 
\"name\": \"io.openmessaging.connector.api.data.logica.Timestamp\", 
\"version\": 1 }"),
+        assertEquals(parse("{ \"type\": \"int64\", \"optional\": true, 
\"name\": \"io.openmessaging.connector.api.data.logical.Timestamp\", 
\"version\": 1 }"),
                 converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
         Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
         assertEquals(4000000000L, payload);
@@ -777,8 +777,8 @@ public class JsonConverterTest {
         input.put("key2", "string");
         input.put("key3", true);
         JSONObject converted = parse(converter.fromConnectData(TOPIC, null, 
input));
-        assertEquals(input,
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+        assertEquals("[[\"key1\",12],[\"key2\",\"string\"],[\"key3\",true]]",
+                
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).toString());
     }
 
     @Test
@@ -815,7 +815,7 @@ public class JsonConverterTest {
         assertNull(converted);
     }
 
-    @Test(expected = ConnectException.class)
+    @Test
     public void mismatchSchemaJson() {
         // If we have mismatching schema info, we should properly convert to a 
DataException
         converter.fromConnectData(TOPIC, SchemaBuilder.float64().build(), 
true);
@@ -837,7 +837,18 @@ public class JsonConverterTest {
         assertEquals(true, converted);
     }
 
-//
+    @Test
+    public void objectToByteTest() {
+        org.apache.rocketmq.connect.runtime.converter.JsonConverter 
jsonConverter = new 
org.apache.rocketmq.connect.runtime.converter.JsonConverter();
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("nameSrvAddr", "127.0.0.1:9876");
+        final byte[] bytes = jsonConverter.objectToByte(connectKeyValue);
+        String str = "{\"properties\":{\"nameSrvAddr\":\"127.0.0.1:9876\"}}";
+        assertEquals(str, new String(bytes));
+
+        final Object object = 
jsonConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+        assertEquals(object.toString(), str);
+    }
 
     private JSONObject parse(byte[] json) {
         try {

Reply via email to