The same query has been asked in stackoverflow <https://stackoverflow.com/questions/68895934/flink-1-13-1-kafka-producer-error-bytearrayserializer-is-not-an-instance-of-org> also. Another related question <https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in> on Stackoverflow. Does anyone have any suggestions?
On Mon, Aug 23, 2021 at 9:07 PM Debraj Manna <subharaj.ma...@gmail.com> wrote: > I am trying to use flink kafka producer like below > > public static FlinkKafkaProducer<SelfDescribingMessageDO> createProducer() > { > Properties props = new Properties(); > props.setProperty("bootstrap.servers", "<Server details>"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > ByteArraySerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > ByteArraySerializer.class.getName()); > > return new FlinkKafkaProducer<>( > "FlinkSdmKafkaTopic", > new SerializationSchema("FlinkSdmKafkaTopic", 8), > props, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > } > > private static class SerializationSchema implements > KafkaSerializationSchema<SelfDescribingMessageDO> { > final String topic; > final int numPartitions; > > public SerializationSchema(final String topic, final int numPartitions) { > this.topic = topic; > this.numPartitions = numPartitions; > } > > @Override > public ProducerRecord<byte[], byte[]> serialize(SelfDescribingMessageDO > sdm, @Nullable Long aLong) { > return new ProducerRecord<>(topic, > KafkaPublisher.getPartitionId(sdm.getHashKey(), > numPartitions), > sdm.getHashKey().getBytes(StandardCharsets.UTF_8), > sdm.toByteArray()); > } > } > > I am getting the below exception when trying to deploy the flink job. During > unit tests I am not getting this error. > > 2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource > -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> > Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 > transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> > (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: > FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 > (5764a387ede7d6710bcf3ad4e2222248) switched from INITIALIZING to FAILED with > failure cause: org.apache.kafka.common.KafkaException: Failed to construct > kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.KafkaException: class > org.apache.kafka.common.serialization.ByteArraySerializer is not an instance > of org.apache.kafka.common.serialization.Serializer > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) > > Can someone let me know what is going wrong? > > I have added flink connector kafka as my dependency in the application code. > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>1.13.1</version> > </dependency> > > I only have flink-connector-kafka as the non test dependency in my pom (for > kafka). > > [INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile > [INFO] | +- org.apache.kafka:kafka-clients:jar:2.4.1:compile > [INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test > [INFO] +- > org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test > [INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test > [INFO] | +- org.apache.kafka:kafka_2.11:jar:2.4.0:test > [INFO] | +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test > [INFO] | +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test > [INFO] | +- org.apache.kafka:connect-api:jar:2.4.0:test > [INFO] | +- org.apache.kafka:connect-json:jar:2.4.0:test > [INFO] | \- org.apache.kafka:connect-runtime:jar:2.4.0:test > [INFO] | +- org.apache.kafka:kafka-tools:jar:2.4.0:test > [INFO] | | +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test > [INFO] | +- org.apache.kafka:connect-transforms:jar:2.4.0:test > >