[ https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758339#comment-17758339 ]
Hang Ruan commented on FLINK-32798: ----------------------------------- Hi, all. I have done some tests about this feature. 1.Create a mysql table to store the changes. {code:java} CREATE TABLE `listener_test` ( `id` bigint NOT NULL AUTO_INCREMENT, `catalog` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC; {code} 2.Develop the Factory and Listener. {code:java} package org.self.listener; import org.apache.flink.table.catalog.listener.CatalogModificationListener; import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory; public class MyCatalogListenerFactory implements CatalogModificationListenerFactory { @Override public CatalogModificationListener createListener(Context context) { return new MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000", "username", "password"); } @Override public String factoryIdentifier() { return "test"; } } {code} {code:java} package org.self.listener; import org.apache.flink.table.catalog.listener.AlterDatabaseEvent; import org.apache.flink.table.catalog.listener.AlterTableEvent; import org.apache.flink.table.catalog.listener.CatalogModificationEvent; import org.apache.flink.table.catalog.listener.CatalogModificationListener; import org.apache.flink.table.catalog.listener.CreateDatabaseEvent; import org.apache.flink.table.catalog.listener.CreateTableEvent; import org.apache.flink.table.catalog.listener.DropDatabaseEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.stream.Collectors; public class MyCatalogListener implements CatalogModificationListener { private final String jdbcUrl; private final String username; private final String password; public MyCatalogListener(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } @Override public void onEvent(CatalogModificationEvent event) { try { Class.forName(getDriverClassName()); try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password); PreparedStatement statement = connection.prepareStatement("INSERT INTO `listener_test` (`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) { String identifier; String type; String detail; String catalog = event.context().getCatalogName(); if (event instanceof CreateDatabaseEvent) { CreateDatabaseEvent cde = (CreateDatabaseEvent) event; identifier = "DB:" + cde.databaseName(); detail = cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")); type = "CREATE DB"; } else if (event instanceof AlterDatabaseEvent) { AlterDatabaseEvent ade = (AlterDatabaseEvent) event; identifier = "DB:" + ade.databaseName(); detail = ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")); type = "ALTER DB"; } else if (event instanceof DropDatabaseEvent) { DropDatabaseEvent dde = (DropDatabaseEvent) event; identifier = "DB:" + dde.databaseName(); detail = "null"; type = "DELETE DB"; } else if (event instanceof CreateTableEvent) { CreateTableEvent cte = (CreateTableEvent) event; identifier = "TBL:" + cte.identifier().toString(); detail = cte.table().toString(); type = "CREATE TBL"; } else if (event instanceof AlterTableEvent) { AlterTableEvent ate = (AlterTableEvent) event; identifier = "TBL:" + ate.identifier().toString(); detail = ate.newTable().toString(); type = "ALTER TBL"; } else if (event instanceof DropTableEvent) { DropTableEvent dte = (DropTableEvent) event; identifier = "TBL:" + dte.identifier().toString(); detail = dte.table().toString(); type = "DELETE TBL"; } else { throw new IllegalArgumentException("Unknown event type."); } statement.setObject(1, catalog); statement.setObject(2, identifier); statement.setObject(3, type); statement.setObject(4, detail); statement.execute(); } } catch (Exception e) { throw new IllegalArgumentException(e); } } private String getDriverClassName() { try { Class.forName("com.mysql.cj.jdbc.Driver"); return "com.mysql.cj.jdbc.Driver"; } catch (ClassNotFoundException e) { return "com.mysql.jdbc.Driver"; } } } {code} 3.Add file `org.apache.flink.table.factories.Factory` to the resources. 4.Package the code and put it in `lib`. 5.Add `table.catalog-modification.listeners: test` to `flink-conf.yaml`. 6.Start sql client and test. The test result as follows. !sqls.png! !result.png! It seems good when using by the sql client. And I think we should add the Step 3 and 4 to docs. > Release Testing: Verify FLIP-294: Support Customized Catalog Modification > Listener > ---------------------------------------------------------------------------------- > > Key: FLINK-32798 > URL: https://issues.apache.org/jira/browse/FLINK-32798 > Project: Flink > Issue Type: Sub-task > Components: Tests > Affects Versions: 1.18.0 > Reporter: Qingsheng Ren > Priority: Major > Fix For: 1.18.0 > > Attachments: result.png, sqls.png, test.png > > > The document about catalog modification listener is: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener -- This message was sent by Atlassian Jira (v8.20.10#820010)