He Wang created FLINK-32714: ------------------------------- Summary: JDBC: Add dialect for OceanBase database Key: FLINK-32714 URL: https://issues.apache.org/jira/browse/FLINK-32714 Project: Flink Issue Type: New Feature Reporter: He Wang
OceanBase is a distributed relational database, the community edition of OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.] The enterprise edition of OceanBase is compatible with MySql and Oracle, which means we can reuse almost all the dialect rules. The difference from other databases is that we must provide the compatibility mode firstly, then the connector can determine which dialect to use, so a startup option like 'compatible-mode' is needed. A dialect implementation for OceanBase is like below: {code:java} package org.apache.flink.connector.jdbc.databases.oceanbase; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nonnull; import java.util.Optional; import java.util.Set; /** JDBC dialect for OceanBase. */ public class OceanBaseDialect extends AbstractDialect { private static final long serialVersionUID = 1L; private final AbstractDialect dialect; public OceanBaseDialect(@Nonnull String compatibleMode) { switch (compatibleMode.toLowerCase()) { case "mysql": this.dialect = new MySqlDialect(); break; case "oracle": this.dialect = new OracleDialect(); break; default: throw new IllegalArgumentException( "Unsupported compatible mode: " + compatibleMode); } } @Override public String dialectName() { return "OceanBase"; } @Override public Optional<String> defaultDriverName() { return Optional.of("com.oceanbase.jdbc.Driver"); } @Override public Set<LogicalTypeRoot> supportedTypes() { return dialect.supportedTypes(); } @Override public JdbcRowConverter getRowConverter(RowType rowType) { return dialect.getRowConverter(rowType); } @Override public String getLimitClause(long limit) { return dialect.getLimitClause(limit); } @Override public String quoteIdentifier(String identifier) { return dialect.quoteIdentifier(identifier); } @Override public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] conditionFields) { return dialect.getUpsertStatement(tableName, fieldNames, conditionFields); } @Override public Optional<Range> timestampPrecisionRange() { return dialect.timestampPrecisionRange(); } @Override public Optional<Range> decimalPrecisionRange() { return dialect.decimalPrecisionRange(); } @Override public String appendDefaultUrlProperties(String url) { return dialect.appendDefaultUrlProperties(url); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)