flink版本1.11.2,有没有大佬遇到这个问题?


[email protected]
 
发件人: [email protected]
发送时间: 2020-11-04 16:43
收件人: user-zh
主题: cdc代码报错
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out 
(1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source 
-> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched 
from RUNNING to FAILED.
java.lang.AbstractMethodError: 
org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at 
io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 
源码:
public class CdcTest {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("sohay") // monitor all tables under inventory 
database
                .username("root")
                .password("123456")
                .deserializer(new StringDebeziumDeserializationSchema()) // 
converts SourceRecord to String
                .build();
 
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
        env.addSource(sourceFunction).print().setParallelism(1); // use 
parallelism 1 for sink to keep message ordering
 
        env.execute();
    }
}
 
 
 
[email protected]

回复