The same query has been asked in stackoverflow
<https://stackoverflow.com/questions/68895934/flink-1-13-1-kafka-producer-error-bytearrayserializer-is-not-an-instance-of-org>
also. Another related question
<https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in>
on Stackoverflow. Does anyone have any suggestions?

On Mon, Aug 23, 2021 at 9:07 PM Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> I am trying to use flink kafka producer like below
>
> public static FlinkKafkaProducer<SelfDescribingMessageDO> createProducer()
> {
>         Properties props = new Properties();
>         props.setProperty("bootstrap.servers", "<Server details>");
>         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>         props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>
>         return new FlinkKafkaProducer<>(
>                 "FlinkSdmKafkaTopic",
>                 new SerializationSchema("FlinkSdmKafkaTopic", 8),
>                 props,
>                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>     }
>
> private static class SerializationSchema implements 
> KafkaSerializationSchema<SelfDescribingMessageDO> {
>     final String topic;
>     final int numPartitions;
>
>     public SerializationSchema(final String topic, final int numPartitions) {
>         this.topic = topic;
>         this.numPartitions = numPartitions;
>     }
>
>     @Override
>     public ProducerRecord<byte[], byte[]> serialize(SelfDescribingMessageDO 
> sdm, @Nullable Long aLong) {
>         return new ProducerRecord<>(topic,
>                 KafkaPublisher.getPartitionId(sdm.getHashKey(), 
> numPartitions),
>                 sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
>                 sdm.toByteArray());
>     }
> }
>
> I am getting the below exception when trying to deploy the flink job. During 
> unit tests I am not getting this error.
>
> 2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource 
> -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> 
> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> 
> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: 
> FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> (5764a387ede7d6710bcf3ad4e2222248) switched from INITIALIZING to FAILED with 
> failure cause: org.apache.kafka.common.KafkaException: Failed to construct 
> kafka producer
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>         at 
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
>         at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
>         at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
>         at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>         at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>         at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.KafkaException: class 
> org.apache.kafka.common.serialization.ByteArraySerializer is not an instance 
> of org.apache.kafka.common.serialization.Serializer
>         at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
>         at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
>
> Can someone let me know what is going wrong?
>
> I have added flink connector kafka as my dependency in the application code.
>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-connector-kafka_2.12</artifactId>
>     <version>1.13.1</version>
> </dependency>
>
> I only have flink-connector-kafka as the non test dependency in my pom (for 
> kafka).
>
> [INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
> [INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
> [INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
> [INFO] +- 
> org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
> [INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
> [INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
> [INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
> [INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
> [INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
> [INFO] |     +- org.apache.kafka:kafka-tools:jar:2.4.0:test
> [INFO] |     |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
> [INFO] |     +- org.apache.kafka:connect-transforms:jar:2.4.0:test
>
>

Reply via email to