He Wang created FLINK-32714:
-------------------------------

             Summary: JDBC: Add dialect for OceanBase database
                 Key: FLINK-32714
                 URL: https://issues.apache.org/jira/browse/FLINK-32714
             Project: Flink
          Issue Type: New Feature
            Reporter: He Wang


OceanBase is a distributed relational database, the community edition of 
OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.]

The enterprise edition of OceanBase is compatible with MySql and Oracle, which 
means we can reuse almost all the dialect rules. 

The difference from other databases is that we must provide the compatibility 
mode firstly, then the connector can determine which dialect to use, so a 
startup option like 'compatible-mode'  is needed.

A dialect implementation for OceanBase is like below: 
{code:java}
package org.apache.flink.connector.jdbc.databases.oceanbase;


import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nonnull;

import java.util.Optional;
import java.util.Set;

/** JDBC dialect for OceanBase. */
public class OceanBaseDialect extends AbstractDialect {

    private static final long serialVersionUID = 1L;

    private final AbstractDialect dialect;

    public OceanBaseDialect(@Nonnull String compatibleMode) {
        switch (compatibleMode.toLowerCase()) {
            case "mysql":
                this.dialect = new MySqlDialect();
                break;
            case "oracle":
                this.dialect = new OracleDialect();
                break;
            default:
                throw new IllegalArgumentException(
                        "Unsupported compatible mode: " + compatibleMode);
        }
    }

    @Override
    public String dialectName() {
        return "OceanBase";
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("com.oceanbase.jdbc.Driver");
    }

    @Override
    public Set<LogicalTypeRoot> supportedTypes() {
        return dialect.supportedTypes();
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return dialect.getRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long limit) {
        return dialect.getLimitClause(limit);
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return dialect.quoteIdentifier(identifier);
    }

    @Override
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] conditionFields) {
        return dialect.getUpsertStatement(tableName, fieldNames, 
conditionFields);
    }

    @Override
    public Optional<Range> timestampPrecisionRange() {
        return dialect.timestampPrecisionRange();
    }

    @Override
    public Optional<Range> decimalPrecisionRange() {
        return dialect.decimalPrecisionRange();
    }

    @Override
    public String appendDefaultUrlProperties(String url) {
        return dialect.appendDefaultUrlProperties(url);
    }
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to