FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new
FlinkKafkaConsumer<>(
topic,
new CustomKafkaDeserializationSchema(),
props
);
public class CustomKafkaDeserializationSchema implements
KafkaDeserializationSchema<ConsumerRecord<String, String>> {
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[],
byte[]> record) throws Exception {
ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
new String(record.key(), "UTF-8"),
new String(record.value(), "UTF-8")
);
return consumerRecord;
}
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String,
String>>() {
});
}
}
在 2021-09-14 10:14:10,"Caizhi Weng" <[email protected]> 写道:
>Hi!
>
>邮件里看不到图片,请检查一下。
>
>从标题来看,是不是写了一个自己的 kafka deserialization schema,然后这个类是 abstract class,不能直接
>new?
>
>赢峰 <[email protected]> 于2021年9月13日周一 下午8:57写道:
>
>> 报错如下:
>>
>>
>> 代码如下:
>>
>>
>>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>>
>>