Hi,wanglei I think Aljoscha is wright. Could you post your denpendency list? Dependency flink-connector-kafka is used in dataStream Application which you should use, dependency flink-sql-connector-kafka is used in Table API & SQL Application. We should only add one of them because the two dependency will conflict.
Best, Leonard Xu > 在 2020年5月26日,15:02,Aljoscha Krettek <aljos...@apache.org> 写道: > > I think what might be happening is that you're mixing dependencies from the > flink-sql-connector-kafka and the proper flink-connector-kafka that should be > used with the DataStream API. Could that be the case? > > Best, > Aljoscha > > On 25.05.20 19:18, Piotr Nowojski wrote: >> Hi, >> It would be helpful if you could provide full stack trace, what Flink >> version and which Kafka connector version are you using? >> It sounds like either a dependency convergence error (mixing Kafka >> dependencies/various versions of flink-connector-kafka inside a single >> job/jar) or some shading issue. Can you check your project for such issues >> (`mvn dependency:tree` command [1]). >> Also what’s a bit suspicious for me is the return type: >>> Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; >> I’m not sure, but I was not aware that we are shading Kafka dependency in >> our connectors? Are you manually shading something? >> Piotrek >> [1] >> https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html >> >> <https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html> >>> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote: >>> >>> >>> public class MyKafkaSerializationSchema implements >>> KafkaSerializationSchema<Tuple2<String, String>> { >>> @Override >>> public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> >>> o, @Nullable Long aLong) { >>> ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(o.f0, >>> o.f1.getBytes(StandardCharsets.UTF_8)); >>> return record; >>> } >>> } >>> FlinkKafkaProducer<Tuple2<String, String>> producer = new >>> FlinkKafkaProducer<Tuple2<String, String>>( >>> "default", new MyKafkaSerializationSchema(), >>> prop2,Semantic.EXACTLY_ONCE); >>> >>> But there's error when runnng: >>> >>> java.lang.AbstractMethodError: >>> com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; >>> >>> Any suggestion on this? >>> >>> Thanks, >>> Lei >>> wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn> >