Hi,

I am attempting to create a Key/Value serializer for the Kafka table
connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
classes, updating the serializer.

First, I created `JsonRowKeyedSerializationSchema` which implements
`KeyedSerializationSchema`[2], which is deprecated. The way it works is you
provide a list of indices in your Row output that are the Key. This works
successfully.

When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
`KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
exception. Normally, this would me I'm using an old interface however all
my Flink dependencies are version 1.9. I can not find this abstract
`serialize()` function in the Flink codebase. Has anyone come across this
before?

When I print the method of my `JsonRowKeyedSerializationSchema` class, I do
see the below which seems to be getting called by the FlinkKafkaProducer
but I do not see it in `KafkaSerializationSchema`:

public abstract
org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
java.lang.Object
java.lang.Long


*`JsonRowKeyedSerializationSchema` class*

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;

public class JsonRowKeyedSerializationSchema implements
KafkaSerializationSchema<Row> {

  // constructors and helpers

  @Override
  public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long
aLong) {
    return new ProducerRecord<>("some_topic", serializeKey(row),
serializeValue(row));
  }
}


*Stacktrace:*

Caused by: java.lang.AbstractMethodError: Method
com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
is abstract
at
com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)


[1]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
[2]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
[3]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java

Reply via email to