Hi Thilo, You need also add the transitive dependencies of Avro, e.g. jackson-core-asl, jackson-mapper-asl, etc. It will provided a uber jar since 1.12, you could refer https://issues.apache.org/jira/browse/FLINK-18802 <https://issues.apache.org/jira/browse/FLINK-18802> for more details.
Regards, Dian > 在 2020年10月17日,上午5:25,Schneider, Thilo <t.schneid...@fraport.de> 写道: > > Dear list, > > I’m having my first go at using Flink and quickly stumbled over a problem I > find no easy way around. I hope you may help me. > > I try to read an avro encoded kafka topic. Doing so, I do get a > NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but > this should be included in the provided avro-1.9.2.jar. The jar is correctly > picked up, as I do get “java.lang.ClassNotFoundException: > org.apache.avro.generic.IndexedRecord” if I remove that dependency. > > […] > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.avro.SchemaBuilder > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > > The code I am using is the following: > > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > > jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar", > "flink-avro-1.11.2-sql-jar.jar", > "avro-1.9.2.jar"] > > jar_base_path = "file:///path/to/my/jars/ <file:///path/to/my/jars/>" > table_env.get_config().get_configuration().set_string('pipeline.jars',';'.join([jar_base_path > + j for j in jars])) > > table_env.execute_sql(""" > CREATE TABLE test ( > a STRING, > b INT, > c TIMESTAMP > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'my_topic', > 'properties.bootstrap.servers' = 'kafka.broker.com:9192 > <http://kafka.broker.com:9192/>', > 'properties.group.id' = 'something', > 'format' = 'avro' > ) > """) > > test = table_env.from_path('test') > test.to_pandas() > > Any help would be appreciated. > > Thanks in advance > Thilo > > Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, > Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB > 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des > Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; > Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. > Pierre Dominique Prümm, Dr. Matthias Zieschang