(resubmission of a previous post, since the stack trace didn't show up last
time)

We're attempting to upgrade our 1.3.2 cluster and jobs to 1.4.0. When 
submitting jobs to the 1.4.0 Kafka cluster, they fail with a Kryo 
registration error. 

My jobs are consuming from Kafka topics with messages in Avro format. The 
avro schemas are registered with a Confluent avro schema registry. For 
ingestion, we've been using the KafkaDeserializerWrapper class from this 
pull request: https://github.com/apache/flink/pull/2705

In the pom.xml, I added a new dependency for flink-avro, and upgraded all 
other maven dependencies to version 1.4.0 

Here's the error: 

java.lang.VerifyError: Bad type on operand stack
  Exception Details:
    Location:
    

org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
@23: invokespecial
  Reason:
    Type
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
(current frame, stack[7]) is not assignable to
'com/esotericsoftware/kryo/Serializer'
  Current Frame:
    bci: @23
    flags: { }
    locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils',
'java/util/LinkedHashMap' }
    stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6,
uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12,
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
}
  Bytecode:
    0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
    0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
    0x0000020: 57b1                                   

        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
        at java.lang.Class.getConstructor0(Class.java:3075)
        at java.lang.Class.getConstructor(Class.java:1825)
        at
org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:119)
        at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
        at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
        at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
        at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
        at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

Here are the dependencies: 

    <dependencies>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-statsd</artifactId>
            <version>1.4</version>
        </dependency>

       <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.3.1</version>
        </dependency>
        








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to