Copilot commented on code in PR #92:
URL:
https://github.com/apache/doris-kafka-connector/pull/92#discussion_r2963726453
##########
src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java:
##########
@@ -212,6 +256,15 @@ private void readSinkRecordNonKeyData(SinkRecord record,
boolean flattened) {
}
}
+ private void readSinkRecordKeyData(SinkRecord record, boolean
flattened) {
+ final Schema keySchema = record.keySchema();
+ if (keySchema != null) {
+ if (!flattened) {
+ applyKeyFields(keySchema.schema());
Review Comment:
`readSinkRecordKeyData` calls `keySchema.schema()`, but
`org.apache.kafka.connect.data.Schema` doesn’t have a `schema()` method. This
will not compile; pass `keySchema` directly (or, if you intended the key
*value*’s schema, cast `record.key()` to `Struct` and call `.schema()`).
```suggestion
applyKeyFields(keySchema);
```
##########
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaCreationManager.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import io.debezium.time.MicroTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import
org.apache.doris.kafka.connector.converter.RecordDescriptor.FieldDescriptor;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: Improve table creation configuration properties via doris options
+public class SchemaCreationManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaCreationManager.class);
+ private final DorisOptions dorisOptions;
+ private static final String CREATE_DDL = "CREATE TABLE IF NOT EXISTS %s ";
+
+ public SchemaCreationManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ private void appendList(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames,
+ Function<String, String> transform) {
+ for (Iterator<String> iterator = columnNames.iterator();
iterator.hasNext(); ) {
+ builder.append(transform.apply(iterator.next()));
+ if (iterator.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendLists(builder, ", ", columnNames1, columnNames2, transform);
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendList(builder, delimiter, columnNames1, transform);
+ if (!columnNames1.isEmpty() && !columnNames2.isEmpty()) {
+ builder.append(delimiter);
+ }
+ appendList(builder, delimiter, columnNames2, transform);
+ }
+
+ private boolean execute(String tableName, String ddl) {
+ if (StringUtils.isEmpty(ddl)) {
+ return false;
+ }
+ LOG.info("Execute SQL: {}", ddl);
+ RestService.createTable(dorisOptions, dorisOptions.getDatabase(),
tableName, ddl, LOG);
+ return true;
+ }
+
+ public void createTable(String tableName, RecordDescriptor
recordDescriptor) {
+ try {
+ String statementCreateTableDDL = buildCreateTableDDL(tableName,
recordDescriptor);
+ boolean status = execute(tableName, statementCreateTableDDL);
+ LOG.info(
+ "Created missing {} table from {} database, ddl={},
status={}",
+ tableName,
+ dorisOptions.getDatabase(),
+ statementCreateTableDDL,
+ status);
+ } catch (Exception e) {
+ LOG.warn("Failed to create table {}, cause by: {}", tableName, e);
Review Comment:
In the `catch` block, `LOG.warn("... {}", e)` treats the exception as a
formatting argument, so the stack trace isn’t logged. Prefer `LOG.warn("...",
e)` (or include both message args and pass `e` as the final parameter) to
preserve stack traces for troubleshooting create-table failures.
```suggestion
LOG.warn("Failed to create table {}, cause by:", tableName, e);
```
##########
src/test/java/org/apache/doris/kafka/connector/converter/schema/TestSchemaCreationManager.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import static org.mockito.Mockito.mockStatic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.converter.RecordTypeRegister;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.TestRecordBuffer;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+public class TestSchemaCreationManager {
+
+ private JsonConverter jsonConverter = new JsonConverter();
+ private SchemaCreationManager schemaCreationManager;
+ private RecordTypeRegister recordTypeRegister;
+ private Properties props = new Properties();
+ private MockedStatic<RestService> mockRestService;
+
+ @Before
+ public void init() throws IOException {
+ InputStream stream =
+ this.getClass()
+ .getClassLoader()
+
.getResourceAsStream("doris-connector-sink.properties");
+ props.load(stream);
Review Comment:
Test setup loads `doris-connector-sink.properties` via
`getResourceAsStream(...)` but never closes the returned `InputStream`. Use
try-with-resources (and optionally assert the stream is non-null) to avoid
resource leaks and clearer failures when the resource is missing.
```suggestion
try (InputStream stream =
this.getClass()
.getClassLoader()
.getResourceAsStream("doris-connector-sink.properties")) {
Assert.assertNotNull(
"doris-connector-sink.properties not found on
classpath", stream);
props.load(stream);
}
```
##########
src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java:
##########
@@ -178,22 +202,42 @@ public RecordDescriptor build() {
Objects.requireNonNull(sinkRecord, "The sink record must be
provided.");
final boolean flattened = !isTombstone(sinkRecord) &&
isFlattened(sinkRecord);
+ final boolean keyFlattened = !keyIsTombstone(sinkRecord) &&
keyIsFlattened(sinkRecord);
readSinkRecordNonKeyData(sinkRecord, flattened);
-
- return new RecordDescriptor(
- sinkRecord, sinkRecord.topic(), nonKeyFieldNames,
allFields, flattened);
+ readSinkRecordKeyData(sinkRecord, keyFlattened);
+
+ if (keyFieldNames.isEmpty()) {
+ return new RecordDescriptor(
+ sinkRecord, sinkRecord.topic(), nonKeyFieldNames,
allFields, flattened);
+ } else {
+ return new RecordDescriptor(
+ sinkRecord,
+ sinkRecord.topic(),
+ nonKeyFieldNames,
+ keyFieldNames,
+ allFields,
+ flattened);
+ }
}
private boolean isFlattened(SinkRecord record) {
return record.valueSchema().name() == null
|| !record.valueSchema().name().contains("Envelope");
}
+ private boolean keyIsFlattened(SinkRecord record) {
+ return record.keySchema().name() == null ||
!record.keySchema().name().contains("Key");
Review Comment:
`keyIsFlattened()` dereferences `record.keySchema()` without a null-check. A
`SinkRecord` can have a non-null key with a null key schema, which would throw
an NPE when computing `keyFlattened`. Consider handling `record.keySchema() ==
null` explicitly before accessing `.name()`.
```suggestion
final Schema keySchema = record.keySchema();
if (keySchema == null || keySchema.name() == null) {
// Treat records with no key schema or unnamed key schema as
flattened
return true;
}
return !keySchema.name().contains("Key");
```
##########
src/test/java/org/apache/doris/kafka/connector/converter/schema/TestSchemaCreationManager.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import static org.mockito.Mockito.mockStatic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import org.apache.doris.kafka.connector.converter.RecordTypeRegister;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.apache.doris.kafka.connector.writer.TestRecordBuffer;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+public class TestSchemaCreationManager {
+
+ private JsonConverter jsonConverter = new JsonConverter();
+ private SchemaCreationManager schemaCreationManager;
+ private RecordTypeRegister recordTypeRegister;
+ private Properties props = new Properties();
+ private MockedStatic<RestService> mockRestService;
+
+ @Before
+ public void init() throws IOException {
+ InputStream stream =
+ this.getClass()
+ .getClassLoader()
+
.getResourceAsStream("doris-connector-sink.properties");
+ props.load(stream);
+ DorisSinkConnectorConfig.setDefaultValues((Map) props);
+ props.put("task_id", "1");
+ props.put("converter.mode", "debezium_ingestion");
+ props.put("debezium.schema.evolution", "basic");
+ props.put(
+ "doris.topic2table.map",
+
"debezium_postgresql.wdl_test.psql_common_table:psql_common_table,debezium_postgresql.wdl_test.psql_composite_table:psql_composite_table,debezium_mysql.wdl_test.mysql_common_table:mysql_common_table,debezium_mysql.wdl_test.mysql_geo_table:mysql_geo_table");
+
+ schemaCreationManager = new SchemaCreationManager(new
DorisOptions((Map) props));
+ recordTypeRegister = new RecordTypeRegister(new DorisOptions((Map)
props));
+ HashMap<String, String> config = new HashMap<>();
+ jsonConverter.configure(config, false);
+ mockRestService = mockStatic(RestService.class);
+ }
Review Comment:
`mockStatic(RestService.class)` is created in `@Before`, but these tests
only call `buildCreateTableDDL(...)` (no REST calls), so the static mock
appears unused. If it’s not needed, removing it will simplify the test and
avoid accidentally masking unexpected `RestService` calls in the future.
##########
src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java:
##########
@@ -223,5 +276,12 @@ private void applyNonKeyField(String name, Schema schema) {
nonKeyFieldNames.add(fieldDescriptor.getName());
allFields.put(fieldDescriptor.getName(), fieldDescriptor);
}
+
+ private void applyKeyFields(Schema schema) {
+ System.out.println(schema.toString());
Review Comment:
`applyKeyFields` contains a `System.out.println(...)`, which will spam
stdout in production and tests. Please remove this debug print (or route
through the existing logger at an appropriate level).
```suggestion
```
##########
src/main/java/org/apache/doris/kafka/connector/service/RestService.java:
##########
@@ -328,6 +331,42 @@ public static Schema getSchema(
}
}
+ public static void createTable(
+ DorisOptions dorisOptions, String db, String table, String stmt,
Logger logger) {
+ logger.trace("start create {}.{} table to doris.", db, table);
+ Object responseData = null;
+ try {
+ String tableSchemaUri =
+ String.format(QUERY_STATEMENT_API,
dorisOptions.getHttpUrl(), db);
+ HttpPost httpPost = new HttpPost(tableSchemaUri);
+ httpPost.setHeader(HttpHeaders.AUTHORIZATION,
authHeader(dorisOptions));
+
+ Map<String, Object> payloadMap = new HashMap<>();
+
+ payloadMap.put("stmt", stmt);
+
+ String payloadJsonString =
OBJECT_MAPPER.writeValueAsString(payloadMap);
+
+ StringEntity entity = new StringEntity(payloadJsonString, "UTF-8");
+
+ entity.setContentType("application/json");
+ httpPost.setEntity(entity);
+ httpPost.setHeader("Accept", "application/json");
+
+ Map<String, Object> responseMap = handleResponse(httpPost, logger);
+ responseData = responseMap.get("msg");
+ if (!"success".equalsIgnoreCase(responseData.toString())) {
+ String errMsg = "Fail to create table via Doris FE. res: " +
responseMap.toString();
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
+ }
+ } catch (Exception e) {
+ String errMsg = "SchemaCreation request error," + e;
+ logger.warn(errMsg);
+ throw new SchemaChangeException("SchemaCreation request error with
" + e.getMessage());
+ }
Review Comment:
`createTable` assumes `responseMap.get("msg")` is non-null and equals
"success". If the FE response shape differs (or on error responses without
`msg`), this will throw NPE and hide the real server response. Consider
validating a stable field (e.g., `code == 0` like `SchemaChangeManager`) and
include the full response body in the thrown exception, preserving the original
cause (`new SchemaChangeException(..., e)`).
##########
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaCreationManager.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import io.debezium.time.MicroTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import
org.apache.doris.kafka.connector.converter.RecordDescriptor.FieldDescriptor;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: Improve table creation configuration properties via doris options
+public class SchemaCreationManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaCreationManager.class);
+ private final DorisOptions dorisOptions;
+ private static final String CREATE_DDL = "CREATE TABLE IF NOT EXISTS %s ";
+
+ public SchemaCreationManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ private void appendList(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames,
+ Function<String, String> transform) {
+ for (Iterator<String> iterator = columnNames.iterator();
iterator.hasNext(); ) {
+ builder.append(transform.apply(iterator.next()));
+ if (iterator.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendLists(builder, ", ", columnNames1, columnNames2, transform);
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendList(builder, delimiter, columnNames1, transform);
+ if (!columnNames1.isEmpty() && !columnNames2.isEmpty()) {
+ builder.append(delimiter);
+ }
+ appendList(builder, delimiter, columnNames2, transform);
+ }
+
+ private boolean execute(String tableName, String ddl) {
+ if (StringUtils.isEmpty(ddl)) {
+ return false;
+ }
+ LOG.info("Execute SQL: {}", ddl);
+ RestService.createTable(dorisOptions, dorisOptions.getDatabase(),
tableName, ddl, LOG);
+ return true;
+ }
+
+ public void createTable(String tableName, RecordDescriptor
recordDescriptor) {
+ try {
+ String statementCreateTableDDL = buildCreateTableDDL(tableName,
recordDescriptor);
+ boolean status = execute(tableName, statementCreateTableDDL);
+ LOG.info(
+ "Created missing {} table from {} database, ddl={},
status={}",
+ tableName,
+ dorisOptions.getDatabase(),
+ statementCreateTableDDL,
+ status);
+ } catch (Exception e) {
+ LOG.warn("Failed to create table {}, cause by: {}", tableName, e);
+ throw new SchemaChangeException(
+ "Failed to create table " + tableName + ", cause by:", e);
+ }
+ }
+
+ public String buildCreateTableDDL(String tableName, RecordDescriptor
recordDescriptor) {
+
+ final StringBuilder dmlBuilder = new StringBuilder();
+
+ dmlBuilder.append(String.format(CREATE_DDL, tableName));
+ dmlBuilder.append("(");
+
+ Map<String, FieldDescriptor> allFields = recordDescriptor.getFields();
+ Set<String> keyFieldNames = recordDescriptor.getKeyFieldNames();
+ List<String> allFieldNames = recordDescriptor.getNonKeyFieldNames();
+ List<String> nonKeyFieldNames = new ArrayList<>(allFieldNames);
+ Function<String, String> transform =
+ (name) -> {
+ final StringBuilder columnSpec = new StringBuilder();
+ final FieldDescriptor field = allFields.get(name);
+ final org.apache.kafka.connect.data.Schema fieldSchema =
field.getSchema();
+ final String columnName = field.getName();
+ final String columnType = field.getTypeName();
+ final String[] regKeys =
field.getType().getRegistrationKeys();
+
+ if (regKeys[0].equals(MicroTime.SCHEMA_NAME)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(255)");
+ } else if (keyFieldNames.contains(columnName)
+ && columnType.equals(DorisType.STRING)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(65533)");
+ } else {
+ columnSpec.append(columnName).append("
").append(columnType);
+ }
+ columnSpec.append(fieldSchema.isOptional() ? " NULL" : "
NOT NULL");
+
Review Comment:
`SchemaCreationManager` special-cases Debezium `MicroTime` to
`VARCHAR(255)`, but the registered `MicroTimeType` (via `AbstractTimeType`)
reports the column type as `DATETIME(<precision>)` for schema evolution /
missing-field detection. This creates inconsistent schemas depending on whether
a table was auto-created vs altered. Consider aligning the type mapping (either
change the `MicroTimeType` to emit `VARCHAR` too, or remove the DDL
special-case if `DATETIME` is intended).
##########
src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java:
##########
@@ -161,6 +184,7 @@ public static class Builder {
private Map<String, Type> typeRegistry;
// Internal build state
+ private final Set<String> keyFieldNames = new HashSet<>();
private final List<String> nonKeyFieldNames = new ArrayList<>();
private final Map<String, FieldDescriptor> allFields = new
LinkedHashMap<>();
Review Comment:
`keyFieldNames` is stored in a `HashSet`, and `SchemaCreationManager`
iterates/joins it to generate the UNIQUE KEY and HASH clause. `HashSet`
iteration order is non-deterministic across JVMs/runs, so the generated DDL can
change (and the current unit tests may be brittle). Use an order-preserving
collection (e.g., `LinkedHashSet`) or sort the key names before emitting DDL.
##########
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaCreationManager.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import io.debezium.time.MicroTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import
org.apache.doris.kafka.connector.converter.RecordDescriptor.FieldDescriptor;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: Improve table creation configuration properties via doris options
+public class SchemaCreationManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaCreationManager.class);
+ private final DorisOptions dorisOptions;
+ private static final String CREATE_DDL = "CREATE TABLE IF NOT EXISTS %s ";
+
+ public SchemaCreationManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ private void appendList(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames,
+ Function<String, String> transform) {
+ for (Iterator<String> iterator = columnNames.iterator();
iterator.hasNext(); ) {
+ builder.append(transform.apply(iterator.next()));
+ if (iterator.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendLists(builder, ", ", columnNames1, columnNames2, transform);
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendList(builder, delimiter, columnNames1, transform);
+ if (!columnNames1.isEmpty() && !columnNames2.isEmpty()) {
+ builder.append(delimiter);
+ }
+ appendList(builder, delimiter, columnNames2, transform);
+ }
+
+ private boolean execute(String tableName, String ddl) {
+ if (StringUtils.isEmpty(ddl)) {
+ return false;
+ }
+ LOG.info("Execute SQL: {}", ddl);
+ RestService.createTable(dorisOptions, dorisOptions.getDatabase(),
tableName, ddl, LOG);
+ return true;
+ }
+
+ public void createTable(String tableName, RecordDescriptor
recordDescriptor) {
+ try {
+ String statementCreateTableDDL = buildCreateTableDDL(tableName,
recordDescriptor);
+ boolean status = execute(tableName, statementCreateTableDDL);
+ LOG.info(
+ "Created missing {} table from {} database, ddl={},
status={}",
+ tableName,
+ dorisOptions.getDatabase(),
+ statementCreateTableDDL,
+ status);
+ } catch (Exception e) {
+ LOG.warn("Failed to create table {}, cause by: {}", tableName, e);
+ throw new SchemaChangeException(
+ "Failed to create table " + tableName + ", cause by:", e);
+ }
+ }
+
+ public String buildCreateTableDDL(String tableName, RecordDescriptor
recordDescriptor) {
+
+ final StringBuilder dmlBuilder = new StringBuilder();
+
+ dmlBuilder.append(String.format(CREATE_DDL, tableName));
+ dmlBuilder.append("(");
+
+ Map<String, FieldDescriptor> allFields = recordDescriptor.getFields();
+ Set<String> keyFieldNames = recordDescriptor.getKeyFieldNames();
+ List<String> allFieldNames = recordDescriptor.getNonKeyFieldNames();
+ List<String> nonKeyFieldNames = new ArrayList<>(allFieldNames);
+ Function<String, String> transform =
+ (name) -> {
+ final StringBuilder columnSpec = new StringBuilder();
+ final FieldDescriptor field = allFields.get(name);
+ final org.apache.kafka.connect.data.Schema fieldSchema =
field.getSchema();
+ final String columnName = field.getName();
+ final String columnType = field.getTypeName();
+ final String[] regKeys =
field.getType().getRegistrationKeys();
+
+ if (regKeys[0].equals(MicroTime.SCHEMA_NAME)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(255)");
+ } else if (keyFieldNames.contains(columnName)
+ && columnType.equals(DorisType.STRING)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(65533)");
+ } else {
+ columnSpec.append(columnName).append("
").append(columnType);
+ }
Review Comment:
DDL generation uses raw `tableName`/`columnName` without quoting/escaping.
Elsewhere (e.g., `SchemaChangeManager.identifier`) identifiers are wrapped with
backticks to avoid reserved words / special chars. Consider applying the same
quoting here for table + column names so auto-created tables don’t fail on e.g.
reserved keywords and to keep consistent behavior.
##########
src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java:
##########
@@ -145,14 +148,9 @@ private static boolean isSchemaChange(SinkRecord record) {
private void checkAndApplyTableChangesIfNeeded(
String tableName, RecordDescriptor recordDescriptor) {
if (!hasTable(tableName)) {
- // TODO Table does not exist, lets attempt to create it.
- LOG.warn("The {} table does not exist, please create it
manually.", tableName);
- throw new DorisException(
- "The " + tableName + " table does not exist, please create
it manually.");
- } else {
- // Table exists, lets attempt to alter it if necessary.
- alterTableIfNeeded(tableName, recordDescriptor);
+ schemaCreationManager.createTable(tableName, recordDescriptor);
}
+ alterTableIfNeeded(tableName, recordDescriptor);
}
Review Comment:
The new table-auto-creation path in `checkAndApplyTableChangesIfNeeded` is
not covered by existing tests: current `TestRecordService` stubs
`tableExists(...)` to always return `true`, so
`schemaCreationManager.createTable(...)` is never exercised. Add a unit test
for `tableExists == false` that verifies the create call is made (and that
subsequent alter logic still runs as expected).
##########
src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaCreationManager.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.converter.schema;
+
+import io.debezium.time.MicroTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.RecordDescriptor;
+import
org.apache.doris.kafka.connector.converter.RecordDescriptor.FieldDescriptor;
+import org.apache.doris.kafka.connector.converter.type.doris.DorisType;
+import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.service.RestService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: Improve table creation configuration properties via doris options
+public class SchemaCreationManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaCreationManager.class);
+ private final DorisOptions dorisOptions;
+ private static final String CREATE_DDL = "CREATE TABLE IF NOT EXISTS %s ";
+
+ public SchemaCreationManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ private void appendList(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames,
+ Function<String, String> transform) {
+ for (Iterator<String> iterator = columnNames.iterator();
iterator.hasNext(); ) {
+ builder.append(transform.apply(iterator.next()));
+ if (iterator.hasNext()) {
+ builder.append(delimiter);
+ }
+ }
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendLists(builder, ", ", columnNames1, columnNames2, transform);
+ }
+
+ private void appendLists(
+ StringBuilder builder,
+ String delimiter,
+ Collection<String> columnNames1,
+ Collection<String> columnNames2,
+ Function<String, String> transform) {
+ appendList(builder, delimiter, columnNames1, transform);
+ if (!columnNames1.isEmpty() && !columnNames2.isEmpty()) {
+ builder.append(delimiter);
+ }
+ appendList(builder, delimiter, columnNames2, transform);
+ }
+
+ private boolean execute(String tableName, String ddl) {
+ if (StringUtils.isEmpty(ddl)) {
+ return false;
+ }
+ LOG.info("Execute SQL: {}", ddl);
+ RestService.createTable(dorisOptions, dorisOptions.getDatabase(),
tableName, ddl, LOG);
+ return true;
+ }
+
+ public void createTable(String tableName, RecordDescriptor
recordDescriptor) {
+ try {
+ String statementCreateTableDDL = buildCreateTableDDL(tableName,
recordDescriptor);
+ boolean status = execute(tableName, statementCreateTableDDL);
+ LOG.info(
+ "Created missing {} table from {} database, ddl={},
status={}",
+ tableName,
+ dorisOptions.getDatabase(),
+ statementCreateTableDDL,
+ status);
+ } catch (Exception e) {
+ LOG.warn("Failed to create table {}, cause by: {}", tableName, e);
+ throw new SchemaChangeException(
+ "Failed to create table " + tableName + ", cause by:", e);
+ }
+ }
+
+ public String buildCreateTableDDL(String tableName, RecordDescriptor
recordDescriptor) {
+
+ final StringBuilder dmlBuilder = new StringBuilder();
+
+ dmlBuilder.append(String.format(CREATE_DDL, tableName));
+ dmlBuilder.append("(");
+
+ Map<String, FieldDescriptor> allFields = recordDescriptor.getFields();
+ Set<String> keyFieldNames = recordDescriptor.getKeyFieldNames();
+ List<String> allFieldNames = recordDescriptor.getNonKeyFieldNames();
+ List<String> nonKeyFieldNames = new ArrayList<>(allFieldNames);
+ Function<String, String> transform =
+ (name) -> {
+ final StringBuilder columnSpec = new StringBuilder();
+ final FieldDescriptor field = allFields.get(name);
+ final org.apache.kafka.connect.data.Schema fieldSchema =
field.getSchema();
+ final String columnName = field.getName();
+ final String columnType = field.getTypeName();
+ final String[] regKeys =
field.getType().getRegistrationKeys();
+
+ if (regKeys[0].equals(MicroTime.SCHEMA_NAME)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(255)");
+ } else if (keyFieldNames.contains(columnName)
+ && columnType.equals(DorisType.STRING)) {
+ columnSpec.append(columnName).append("
").append("VARCHAR(65533)");
+ } else {
+ columnSpec.append(columnName).append("
").append(columnType);
+ }
+ columnSpec.append(fieldSchema.isOptional() ? " NULL" : "
NOT NULL");
+
+ return columnSpec.toString();
+ };
+
+ nonKeyFieldNames.removeIf(keyFieldNames::contains);
+
+ appendLists(dmlBuilder, keyFieldNames, nonKeyFieldNames, transform);
+ dmlBuilder.append(")");
+
+ if (!keyFieldNames.isEmpty()) {
+ dmlBuilder.append(
+ String.format(
+ " ENGINE=OLAP UNIQUE KEY (%s) ", String.join(", ",
keyFieldNames)));
+ dmlBuilder.append(
+ String.format(
+ "DISTRIBUTED BY HASH (%s) BUCKETS AUTO",
+ String.join(", ", keyFieldNames)));
+ }
Review Comment:
`keyFieldNames` is a `Set` and is iterated/joined directly when emitting the
column list, UNIQUE KEY, and DISTRIBUTED BY HASH clauses. If the set isn’t
ordered, the generated DDL will be non-deterministic. Consider using a
deterministic order here (e.g., preserve key schema order via `LinkedHashSet`
or sort before `appendLists`/`String.join`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]