This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3294588 [connector](read) compatible with ip data type (#289)
3294588 is described below
commit 3294588498a54d08ca9ed18f419ad1eeb2b239e5
Author: Petrichor <[email protected]>
AuthorDate: Fri Mar 21 13:58:15 2025 +0800
[connector](read) compatible with ip data type (#289)
---
.../apache/doris/spark/client/read/RowBatch.java | 31 ++-
.../doris/spark/client/read/RowBatchTest.java | 295 ++++++++++-----------
2 files changed, 158 insertions(+), 168 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index 86ad0cc..bf576f1 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -282,18 +282,19 @@ public class RowBatch implements Serializable {
}
break;
case "IPV4":
- Preconditions.checkArgument(mt.equals(MinorType.UINT4)
|| mt.equals(MinorType.INT),
+ Preconditions.checkArgument(mt.equals(MinorType.UINT4)
|| mt.equals(MinorType.INT) || mt.equals(MinorType.VARCHAR),
typeMismatchMessage(colName, currentType, mt));
- BaseIntVector ipv4Vector;
- if (mt.equals(MinorType.INT)) {
- ipv4Vector = (IntVector) curFieldVector;
+
+ if (mt.equals(MinorType.VARCHAR)) {
+ VarCharVector vector = (VarCharVector)
curFieldVector;
+ for (int i = 0; i < rowCountInOneBatch; i++) {
+ addValueToRow(i, vector.isNull(i) ? null : new
String(vector.get(i)));
+ }
} else {
- ipv4Vector = (UInt4Vector) curFieldVector;
- }
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = ipv4Vector.isNull(rowIndex) ?
null :
-
IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
- addValueToRow(rowIndex, fieldValue);
+ BaseIntVector vector = (mt.equals(MinorType.INT))
? (IntVector) curFieldVector : (UInt4Vector) curFieldVector;
+ for (int i = 0; i < rowCountInOneBatch; i++) {
+ addValueToRow(i, vector.isNull(i) ? null :
IPUtils.convertLongToIPv4Address(vector.getValueAsLong(i)));
+ }
}
break;
case "FLOAT":
@@ -464,9 +465,15 @@ public class RowBatch implements Serializable {
addValueToRow(rowIndex, null);
break;
}
+ // Compatible with IPv6 in Doris 2.1.3 and above.
String ipv6Str = new
String(ipv6VarcharVector.get(rowIndex));
- String ipv6Address = IPUtils.fromBigInteger(new
BigInteger(ipv6Str));
- addValueToRow(rowIndex, ipv6Address);
+ if (ipv6Str.contains(":")){
+ addValueToRow(rowIndex, ipv6Str);
+ }else {
+ String ipv6Address =
IPUtils.fromBigInteger(new BigInteger(ipv6Str));
+ addValueToRow(rowIndex, ipv6Address);
+ }
+
}
break;
case "ARRAY":
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index acc7712..3b99c18 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -924,16 +924,17 @@ public class RowBatchTest {
@Test
public void testIPv4() throws DorisException, IOException {
- ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
- childrenBuilder.add(
- new Field("k1", FieldType.nullable(new ArrowType.Int(32,
false)), null),
- new Field("k2", FieldType.nullable(new ArrowType.Int(32,
true)), null));
+ ImmutableList<Field> fields =
+ ImmutableList.of(
+ new Field("k1", FieldType.nullable(new
ArrowType.Int(32, false)), null),
+ new Field("k2", FieldType.nullable(new
ArrowType.Int(32, true)), null),
+ new Field("k3", FieldType.nullable(new
ArrowType.Utf8()), null));
VectorSchemaRoot root =
VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(
- childrenBuilder.build(), null),
+ new org.apache.arrow.vector.types.pojo.Schema(fields,
null),
new RootAllocator(Integer.MAX_VALUE));
+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter =
new ArrowStreamWriter(
@@ -942,38 +943,48 @@ public class RowBatchTest {
arrowStreamWriter.start();
root.setRowCount(5);
- FieldVector vector = root.getVector("k1");
- UInt4Vector uInt4Vector = (UInt4Vector) vector;
+ long[] ipValues = {0, 255, 65535, 16777215, 4294967295L};
+ String[] ipStrings = {
+ "0.0.0.0", "0.0.0.255", "0.0.255.255", "0.255.255.255",
"255.255.255.255"
+ };
+
+ UInt4Vector uInt4Vector = (UInt4Vector) root.getVector("k1");
uInt4Vector.setInitialCapacity(5);
- uInt4Vector.allocateNew();
- uInt4Vector.setIndexDefined(0);
- uInt4Vector.setSafe(0, 0);
- uInt4Vector.setIndexDefined(1);
- uInt4Vector.setSafe(1, 255);
- uInt4Vector.setIndexDefined(2);
- uInt4Vector.setSafe(2, 65535);
- uInt4Vector.setIndexDefined(3);
- uInt4Vector.setSafe(3, 16777215);
- uInt4Vector.setIndexDefined(4);
- uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
-
- FieldVector vector1 = root.getVector("k2");
- IntVector int4Vector = (IntVector) vector1;
- int4Vector.setInitialCapacity(5);
- int4Vector.allocateNew();
- int4Vector.setIndexDefined(0);
- int4Vector.setSafe(0, 0);
- int4Vector.setIndexDefined(1);
- int4Vector.setSafe(1, 255);
- int4Vector.setIndexDefined(2);
- int4Vector.setSafe(2, 65535);
- int4Vector.setIndexDefined(3);
- int4Vector.setSafe(3, 16777215);
- int4Vector.setIndexDefined(4);
- int4Vector.setWithPossibleTruncate(4, 4294967295L);
-
- vector.setValueCount(5);
- vector1.setValueCount(5);
+ uInt4Vector.allocateNew(5); // Fixed from 4 to 5 to match actual row
count
+
+ IntVector intVector = (IntVector) root.getVector("k2");
+ intVector.setInitialCapacity(5);
+ intVector.allocateNew(5); // Fixed from 4 to 5 to match actual row
count
+
+ VarCharVector varCharVector = (VarCharVector) root.getVector("k3");
+ varCharVector.setInitialCapacity(5);
+ varCharVector.allocateNew(5); // Fixed from 4 to 5 to match actual row
count
+
+ for (int i = 0; i < 5; i++) {
+ uInt4Vector.setIndexDefined(i);
+ if (i < 4) {
+ uInt4Vector.setSafe(i, (int) ipValues[i]);
+ } else {
+ uInt4Vector.setWithPossibleTruncate(
+ i, ipValues[i]); // Large value that might be truncated
+ }
+
+ intVector.setIndexDefined(i);
+ if (i < 4) {
+ intVector.setSafe(i, (int) ipValues[i]);
+ } else {
+ intVector.setWithPossibleTruncate(
+ i, ipValues[i]); // Large value that might be truncated
+ }
+
+ varCharVector.setIndexDefined(i);
+ byte[] bytes = ipStrings[i].getBytes(StandardCharsets.UTF_8);
+ varCharVector.setSafe(i, bytes, 0, bytes.length);
+ }
+
+ uInt4Vector.setValueCount(5);
+ intVector.setValueCount(5);
+ varCharVector.setValueCount(5);
arrowStreamWriter.writeBatch();
arrowStreamWriter.end();
@@ -988,31 +999,26 @@ public class RowBatchTest {
String schemaStr =
"{\"properties\":["
- +
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, "
- +
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+ +
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"},"
+ +
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"},"
+ +
"{\"type\":\"IPV4\",\"name\":\"k3\",\"comment\":\"\"}"
+ "], \"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow0 = rowBatch.next();
- assertEquals("0.0.0.0", actualRow0.get(0));
- assertEquals("0.0.0.0", actualRow0.get(1));
- List<Object> actualRow1 = rowBatch.next();
- assertEquals("0.0.0.255", actualRow1.get(0));
- assertEquals("0.0.0.255", actualRow1.get(1));
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow2 = rowBatch.next();
- assertEquals("0.0.255.255", actualRow2.get(0));
- assertEquals("0.0.255.255", actualRow2.get(1));
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow3 = rowBatch.next();
- assertEquals("0.255.255.255", actualRow3.get(0));
- assertEquals("0.255.255.255", actualRow3.get(1));
- List<Object> actualRow4 = rowBatch.next();
- assertEquals("255.255.255.255", actualRow4.get(0));
- assertEquals("255.255.255.255", actualRow4.get(1));
+
+ // Validate each row of data
+ for (int i = 0; i < 5; i++) {
+ Assert.assertTrue(
+ "Expected row " + i + " to exist, but it doesn't",
rowBatch.hasNext());
+ List<Object> actualRow = rowBatch.next();
+ assertEquals(ipStrings[i], actualRow.get(0));
+ assertEquals(ipStrings[i], actualRow.get(1));
+ assertEquals(ipStrings[i], actualRow.get(2));
+ }
+
+ // Ensure no more rows exist
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
thrown.expectMessage(startsWith("Get row offset:"));
@@ -1021,15 +1027,16 @@ public class RowBatchTest {
@Test
public void testIPv6() throws DorisException, IOException {
-
- ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
- childrenBuilder.add(new Field("k1", FieldType.nullable(new
ArrowType.Utf8()), null));
+ ImmutableList<Field> childrenFields =
+ ImmutableList.of(
+ new Field("k1", FieldType.nullable(new
ArrowType.Utf8()), null),
+ new Field("k2", FieldType.nullable(new
ArrowType.Utf8()), null));
VectorSchemaRoot root =
VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(
- childrenBuilder.build(), null),
+ new
org.apache.arrow.vector.types.pojo.Schema(childrenFields, null),
new RootAllocator(Integer.MAX_VALUE));
+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter =
new ArrowStreamWriter(
@@ -1038,52 +1045,55 @@ public class RowBatchTest {
arrowStreamWriter.start();
root.setRowCount(13);
- FieldVector vector = root.getVector("k1");
- VarCharVector ipv6Vector = (VarCharVector) vector;
+ VarCharVector ipv6Vector = (VarCharVector) root.getVector("k1");
+ VarCharVector ipv6Vector1 = (VarCharVector) root.getVector("k2");
ipv6Vector.setInitialCapacity(13);
ipv6Vector.allocateNew();
- ipv6Vector.setIndexDefined(0);
- ipv6Vector.setValueLengthSafe(0, 1);
- ipv6Vector.setSafe(0, "0".getBytes());
-
- ipv6Vector.setIndexDefined(1);
- ipv6Vector.setValueLengthSafe(0, 1);
- ipv6Vector.setSafe(1, "1".getBytes());
-
- ipv6Vector.setIndexDefined(2);
- ipv6Vector.setSafe(2, "65535".getBytes());
-
- ipv6Vector.setIndexDefined(3);
- ipv6Vector.setSafe(3, "65536".getBytes());
-
- ipv6Vector.setIndexDefined(4);
- ipv6Vector.setSafe(4, "4294967295".getBytes());
-
- ipv6Vector.setIndexDefined(5);
- ipv6Vector.setSafe(5, "4294967296".getBytes());
-
- ipv6Vector.setIndexDefined(6);
- ipv6Vector.setSafe(6, "8589934591".getBytes());
-
- ipv6Vector.setIndexDefined(7);
- ipv6Vector.setSafe(7, "281470681743359".getBytes());
-
- ipv6Vector.setIndexDefined(8);
- ipv6Vector.setSafe(8, "281470681743360".getBytes());
-
- ipv6Vector.setIndexDefined(9);
- ipv6Vector.setSafe(9, "281474976710655".getBytes());
-
- ipv6Vector.setIndexDefined(10);
- ipv6Vector.setSafe(10, "281474976710656".getBytes());
-
- ipv6Vector.setIndexDefined(11);
- ipv6Vector.setSafe(11,
"340277174624079928635746639885392347137".getBytes());
-
- ipv6Vector.setIndexDefined(12);
- ipv6Vector.setSafe(12,
"340282366920938463463374607431768211455".getBytes());
+ ipv6Vector1.setInitialCapacity(13);
+ ipv6Vector1.allocateNew(13);
+
+ String[] k1Values = {
+ "0",
+ "1",
+ "65535",
+ "65536",
+ "4294967295",
+ "4294967296",
+ "8589934591",
+ "281470681743359",
+ "281470681743360",
+ "281474976710655",
+ "281474976710656",
+ "340277174624079928635746639885392347137",
+ "340282366920938463463374607431768211455"
+ };
+
+ String[] k2Values = {
+ "::",
+ "::1",
+ "::ffff",
+ "::0.1.0.0",
+ "::255.255.255.255",
+ "::1:0:0",
+ "::1:ffff:ffff",
+ "::fffe:ffff:ffff",
+ "::ffff:0.0.0.0",
+ "::ffff:255.255.255.255",
+ "::1:0:0:0",
+ "ffff::1:ffff:ffff:1",
+ "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
+ };
+
+ for (int i = 0; i < 13; i++) {
+ ipv6Vector.setIndexDefined(i);
+ ipv6Vector.setSafe(i, k1Values[i].getBytes());
+
+ ipv6Vector1.setIndexDefined(i);
+ ipv6Vector1.setSafe(i, k2Values[i].getBytes());
+ }
- vector.setValueCount(13);
+ ipv6Vector.setValueCount(13);
+ ipv6Vector1.setValueCount(13);
arrowStreamWriter.writeBatch();
arrowStreamWriter.end();
@@ -1098,63 +1108,36 @@ public class RowBatchTest {
String schemaStr =
"{\"properties\":["
- +
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+ +
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"},"
+ +
"{\"type\":\"IPV6\",\"name\":\"k2\",\"comment\":\"\"}"
+ "], \"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow0 = rowBatch.next();
- assertEquals("::", actualRow0.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow1 = rowBatch.next();
- assertEquals("::1", actualRow1.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow2 = rowBatch.next();
- assertEquals("::ffff", actualRow2.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow3 = rowBatch.next();
- assertEquals("::0.1.0.0", actualRow3.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow4 = rowBatch.next();
- assertEquals("::255.255.255.255", actualRow4.get(0));
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow5 = rowBatch.next();
- assertEquals("::1:0:0", actualRow5.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow6 = rowBatch.next();
- assertEquals("::1:ffff:ffff", actualRow6.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow7 = rowBatch.next();
- assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow8 = rowBatch.next();
- assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow9 = rowBatch.next();
- assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow10 = rowBatch.next();
- assertEquals("::1:0:0:0", actualRow10.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow11 = rowBatch.next();
- assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
-
- Assert.assertTrue(rowBatch.hasNext());
- List<Object> actualRow12 = rowBatch.next();
- assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
actualRow12.get(0));
+ String[][] expectedResults = {
+ {"::", "::"},
+ {"::1", "::1"},
+ {"::ffff", "::ffff"},
+ {"::0.1.0.0", "::0.1.0.0"},
+ {"::255.255.255.255", "::255.255.255.255"},
+ {"::1:0:0", "::1:0:0"},
+ {"::1:ffff:ffff", "::1:ffff:ffff"},
+ {"::fffe:ffff:ffff", "::fffe:ffff:ffff"},
+ {"::ffff:0.0.0.0", "::ffff:0.0.0.0"},
+ {"::ffff:255.255.255.255", "::ffff:255.255.255.255"},
+ {"::1:0:0:0", "::1:0:0:0"},
+ {"ffff::1:ffff:ffff:1", "ffff::1:ffff:ffff:1"},
+ {"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"}
+ };
+
+ for (String[] expectedResult : expectedResults) {
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow = rowBatch.next();
+ assertEquals(expectedResult[0], actualRow.get(0));
+ assertEquals(expectedResult[1], actualRow.get(1));
+ }
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]