Hi Arvid, Interestingly, my job runs successfully in a docker container (image* flink:1.9.0-scala_2.11*) but is failing with the *java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling with java version OpenJDK 1.8.0_242, which is the same version my EMR cluster is running. Though since it runs successfully locally in a docker container, it would point to an issue in our AWS environment setup. Oddly, we have been running Flink on EMR for +2 years and have never come across this till now.
Results of javap are: public class com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema implements org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row> { public static org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row> create(com.jwplayer.flink.config.serde.SerDeConfig); public byte[] serializeKey(org.apache.flink.types.Row); public byte[] serializeValue(org.apache.flink.types.Row); public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]> serialize(org.apache.flink.types.Row, java.lang.Long); public org.apache.kafka.clients.producer.ProducerRecord serialize(java.lang.Object, java.lang.Long); } On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Steve, > > for some reason, it seems as if the Java compiler is not generating the > bridge method [1]. > > Could you double-check that the Java version of your build process and > your cluster match? > > Could you run javap on your generated class file and report back? > > [1] > https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html > > On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <swhe...@jwplayer.com> wrote: > >> Hi, >> >> I am attempting to create a Key/Value serializer for the Kafka table >> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant >> classes, updating the serializer. >> >> First, I created `JsonRowKeyedSerializationSchema` which implements >> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you >> provide a list of indices in your Row output that are the Key. This works >> successfully. >> >> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement >> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError` >> exception. Normally, this would me I'm using an old interface however all >> my Flink dependencies are version 1.9. I can not find this abstract >> `serialize()` function in the Flink codebase. Has anyone come across this >> before? >> >> When I print the method of my `JsonRowKeyedSerializationSchema` class, I >> do see the below which seems to be getting called by the FlinkKafkaProducer >> but I do not see it in `KafkaSerializationSchema`: >> >> public abstract >> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord >> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long) >> java.lang.Object >> java.lang.Long >> >> >> *`JsonRowKeyedSerializationSchema` class* >> >> import >> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; >> import org.apache.flink.types.Row; >> import org.apache.kafka.clients.producer.ProducerRecord; >> >> public class JsonRowKeyedSerializationSchema implements >> KafkaSerializationSchema<Row> { >> >> // constructors and helpers >> >> @Override >> public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long >> aLong) { >> return new ProducerRecord<>("some_topic", serializeKey(row), >> serializeValue(row)); >> } >> } >> >> >> *Stacktrace:* >> >> Caused by: java.lang.AbstractMethodError: Method >> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; >> is abstract >> at >> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) >> >> >> [1] >> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java >> [2] >> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java >> [3] >> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java >> >> >