Hi,wanglei

I think Aljoscha is wright. Could you post your denpendency list?
Dependency flink-connector-kafka is used in dataStream Application which you 
should use, dependency flink-sql-connector-kafka is used in Table API & SQL 
Application. We should only add one of them because the two dependency will 
conflict.   

Best,
Leonard Xu

> 在 2020年5月26日,15:02,Aljoscha Krettek <aljos...@apache.org> 写道:
> 
> I think what might be happening is that you're mixing dependencies from the 
> flink-sql-connector-kafka and the proper flink-connector-kafka that should be 
> used with the DataStream API. Could that be the case?
> 
> Best,
> Aljoscha
> 
> On 25.05.20 19:18, Piotr Nowojski wrote:
>> Hi,
>> It would be helpful if you could provide full stack trace, what Flink 
>> version and which Kafka connector version are you using?
>> It sounds like either a dependency convergence error (mixing Kafka 
>> dependencies/various versions of flink-connector-kafka inside a single 
>> job/jar) or some shading issue. Can you check your project for such issues 
>> (`mvn dependency:tree` command [1]).
>> Also what’s a bit suspicious for me is the return type:
>>> Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>> I’m not sure, but I was not aware that we are shading Kafka dependency in 
>> our connectors? Are you manually shading something?
>> Piotrek
>> [1] 
>> https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
>>  
>> <https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html>
>>> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:
>>> 
>>> 
>>> public class MyKafkaSerializationSchema implements 
>>> KafkaSerializationSchema<Tuple2<String, String>> {
>>>     @Override
>>>     public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> 
>>> o, @Nullable Long aLong) {
>>>         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(o.f0,
>>>             o.f1.getBytes(StandardCharsets.UTF_8));
>>>         return record;
>>>     }
>>> }
>>> FlinkKafkaProducer<Tuple2<String, String>> producer = new 
>>> FlinkKafkaProducer<Tuple2<String, String>>(
>>>     "default", new MyKafkaSerializationSchema(),
>>>     prop2,Semantic.EXACTLY_ONCE);
>>> 
>>> But there's  error when runnng:
>>> 
>>> java.lang.AbstractMethodError: 
>>> com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>>> 
>>> Any suggestion on this?
>>> 
>>> Thanks,
>>> Lei
>>> wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>
> 

Reply via email to