Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
(1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source
-> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched
from RUNNING to FAILED.
java.lang.AbstractMethodError:
org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at
io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
源码:
public class CdcTest {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("sohay") // monitor all tables under inventory
database
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) //
converts SourceRecord to String
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print().setParallelism(1); // use
parallelism 1 for sink to keep message ordering
env.execute();
}
}
[email protected]