[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-35560: ----------------------------------- Labels: pull-request-available (was: ) > Add query validator support to flink sql gateway via spi pattern > ---------------------------------------------------------------- > > Key: FLINK-35560 > URL: https://issues.apache.org/jira/browse/FLINK-35560 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway > Reporter: dongwoo.kim > Priority: Major > Labels: pull-request-available > > h3. Summary > Hello I'd like to suggest query validator support in flink sql gateway via > spi pattern. > As an sql gateway operator, there is need for query validation to only > execute safe queries and drop unsafe queries. > To address this need, I propose adding a {{QueryValidator}} interface in > flink sql gateway api package. > This interface will allow users to implement their own query validation > logic, providing benefits to flink sql gateway operators. > h3. Interface > Below is a draft for the interface. > It takes Operation and check whether the query is valid or not. > {code:java} > package org.apache.flink.table.gateway.api.validator; > import org.apache.flink.annotation.Public; > import org.apache.flink.table.operations.Operation; > /** > * Interface for implementing a validator that checks the safety of executing > queries. > */ > @Public > public interface QueryValidator { > boolean validateQuery(Operation op); > } > {code} > h3. Example implementation > Below is an example implementation that inspects Kafka table options, > specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value > is too small, which can cause high disk I/O load. > {code:java} > package org.apache.flink.table.gateway.api.validator; > import org.apache.flink.table.gateway.api.validator.QueryValidator; > import org.apache.flink.table.operations.Operation; > import org.apache.flink.table.operations.ddl.CreateTableOperation; > public class KafkaTimestampValidator implements QueryValidator { > private static final long ONE_DAY = 24 * 60 * 60 * 1000L; > @Override > public boolean validateQuery(Operation op) { > if (op instanceof CreateTableOperation) { > CreateTableOperation createTableOp = (CreateTableOperation) op; > String connector = > createTableOp.getCatalogTable().getOptions().get("connector"); > if ("kafka".equals(connector)) { > String startupTimestamp = > createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); > if (startupTimestamp != null && > Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { > return false; > } > } > } > return true; > } > }{code} > I'd be happy to implement this feature, if we can reach on agreement. > Thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010)