Hi

这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗?


祝好
Leonard

> 在 2020年9月9日,16:48,杨帅统 <[email protected]> 写道:
> 
> 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
> 在做测试的时候定义一张cdc源表和一张sink表
> CREATE TABLE pvuv_test (
>  id INT,
>  dt STRING,
>  pv STRING,
>  uv STRING ,
>  proc_time AS PROCTIME() --使用维表时需要指定该字段
> ) WITH (
>  'connector' = 'mysql-cdc', -- 连接器
>  'hostname' = 'localhost',   --mysql地址
>  'port' = '3306',  -- mysql端口
>  'username' = 'root',  --mysql用户名
>  'password' = 'rootzs',  -- mysql密码
>  'database-name' = 'etc_demo', --  数据库名称
>  'table-name' = 'puuv_test'
> );
> CREATE TABLE pvuv_test_back (
>  id INT,
>  dt STRING,
>  pv STRING,
>  uv STRING ,
>  proc_time AS PROCTIME() --使用维表时需要指定该字段
> ) WITH (
>  'connector' = 'mysql-cdc', -- 连接器
>  'hostname' = 'localhost',   --mysql地址
>  'port' = '3306',  -- mysql端口
>  'username' = 'root',  --mysql用户名
>  'password' = 'rootzs',  -- mysql密码
>  'database-name' = 'etc_demo', --  数据库名称
>  'table-name' = 'puuv_test_back'
> );
> 但是在通过SQL Client执行下面语句的时候,报错
> INSERT INTO pvuv_test_back
> SELECT * FROM pvuv_test;
> ---------------------------------
> 报错信息如下
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factory 
> for identifier 'mysql-cdc' that implements 
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> Available factory identifiers are:
> blackhole
> elasticsearch-6
> kafka
> print
> 
> 
> Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里
> 
> 
> 最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢

回复