Hi 这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗?
祝好 Leonard > 在 2020年9月9日,16:48,杨帅统 <[email protected]> 写道: > > 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。 > 在做测试的时候定义一张cdc源表和一张sink表 > CREATE TABLE pvuv_test ( > id INT, > dt STRING, > pv STRING, > uv STRING , > proc_time AS PROCTIME() --使用维表时需要指定该字段 > ) WITH ( > 'connector' = 'mysql-cdc', -- 连接器 > 'hostname' = 'localhost', --mysql地址 > 'port' = '3306', -- mysql端口 > 'username' = 'root', --mysql用户名 > 'password' = 'rootzs', -- mysql密码 > 'database-name' = 'etc_demo', -- 数据库名称 > 'table-name' = 'puuv_test' > ); > CREATE TABLE pvuv_test_back ( > id INT, > dt STRING, > pv STRING, > uv STRING , > proc_time AS PROCTIME() --使用维表时需要指定该字段 > ) WITH ( > 'connector' = 'mysql-cdc', -- 连接器 > 'hostname' = 'localhost', --mysql地址 > 'port' = '3306', -- mysql端口 > 'username' = 'root', --mysql用户名 > 'password' = 'rootzs', -- mysql密码 > 'database-name' = 'etc_demo', -- 数据库名称 > 'table-name' = 'puuv_test_back' > ); > 但是在通过SQL Client执行下面语句的时候,报错 > INSERT INTO pvuv_test_back > SELECT * FROM pvuv_test; > --------------------------------- > 报错信息如下 > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Could not find any factory > for identifier 'mysql-cdc' that implements > 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. > Available factory identifiers are: > blackhole > elasticsearch-6 > kafka > print > > > Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里 > > > 最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢
