Dear list, I’m having my first go at using Flink and quickly stumbled over a problem I find no easy way around. I hope you may help me.
I try to read an avro encoded kafka topic. Doing so, I do get a NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but this should be included in the provided avro-1.9.2.jar. The jar is correctly picked up, as I do get “java.lang.ClassNotFoundException: org.apache.avro.generic.IndexedRecord” if I remove that dependency. […] Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240) at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) The code I am using is the following: env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar", "flink-avro-1.11.2-sql-jar.jar", "avro-1.9.2.jar"] jar_base_path = "file:///path/to/my/jars/" table_env.get_config().get_configuration().set_string('pipeline.jars', ';'.join([jar_base_path + j for j in jars])) table_env.execute_sql(""" CREATE TABLE test ( a STRING, b INT, c TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'my_topic', 'properties.bootstrap.servers' = 'kafka.broker.com:9192', 'properties.group.id' = 'something', 'format' = 'avro' ) """) test = table_env.from_path('test') test.to_pandas() Any help would be appreciated. Thanks in advance Thilo Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang