[ https://issues.apache.org/jira/browse/FLINK-32714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-32714: ----------------------------------- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > JDBC: Add dialect for OceanBase database > ---------------------------------------- > > Key: FLINK-32714 > URL: https://issues.apache.org/jira/browse/FLINK-32714 > Project: Flink > Issue Type: New Feature > Reporter: He Wang > Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > OceanBase is a distributed relational database, the community edition of > OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.] > The enterprise edition of OceanBase is compatible with MySql and Oracle, > which means we can reuse almost all the dialect rules. > The difference from other databases is that we must provide the compatibility > mode firstly, then the connector can determine which dialect to use, so a > startup option like 'compatible-mode' is needed. > A dialect implementation for OceanBase is like below: > {code:java} > package org.apache.flink.connector.jdbc.databases.oceanbase; > import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; > import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; > import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; > import org.apache.flink.connector.jdbc.dialect.AbstractDialect; > import org.apache.flink.table.types.logical.LogicalTypeRoot; > import org.apache.flink.table.types.logical.RowType; > import javax.annotation.Nonnull; > import java.util.Optional; > import java.util.Set; > /** JDBC dialect for OceanBase. */ > public class OceanBaseDialect extends AbstractDialect { > private static final long serialVersionUID = 1L; > private final AbstractDialect dialect; > public OceanBaseDialect(@Nonnull String compatibleMode) { > switch (compatibleMode.toLowerCase()) { > case "mysql": > this.dialect = new MySqlDialect(); > break; > case "oracle": > this.dialect = new OracleDialect(); > break; > default: > throw new IllegalArgumentException( > "Unsupported compatible mode: " + compatibleMode); > } > } > @Override > public String dialectName() { > return "OceanBase"; > } > @Override > public Optional<String> defaultDriverName() { > return Optional.of("com.oceanbase.jdbc.Driver"); > } > @Override > public Set<LogicalTypeRoot> supportedTypes() { > return dialect.supportedTypes(); > } > @Override > public JdbcRowConverter getRowConverter(RowType rowType) { > return dialect.getRowConverter(rowType); > } > @Override > public String getLimitClause(long limit) { > return dialect.getLimitClause(limit); > } > @Override > public String quoteIdentifier(String identifier) { > return dialect.quoteIdentifier(identifier); > } > @Override > public Optional<String> getUpsertStatement( > String tableName, String[] fieldNames, String[] conditionFields) { > return dialect.getUpsertStatement(tableName, fieldNames, > conditionFields); > } > @Override > public Optional<Range> timestampPrecisionRange() { > return dialect.timestampPrecisionRange(); > } > @Override > public Optional<Range> decimalPrecisionRange() { > return dialect.decimalPrecisionRange(); > } > @Override > public String appendDefaultUrlProperties(String url) { > return dialect.appendDefaultUrlProperties(url); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)