Hi Thilo,

You need also add the transitive dependencies of Avro, e.g. jackson-core-asl, 
jackson-mapper-asl, etc. It will provided a uber jar since 1.12, you could 
refer https://issues.apache.org/jira/browse/FLINK-18802 
<https://issues.apache.org/jira/browse/FLINK-18802> for more details.

Regards,
Dian

> 在 2020年10月17日,上午5:25,Schneider, Thilo <t.schneid...@fraport.de> 写道:
> 
> Dear list,
>  
> I’m having my first go at using Flink and quickly stumbled over a problem I 
> find no easy way around. I hope you may help me.
>  
> I try to read an avro encoded kafka topic. Doing so, I do get a 
> NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but 
> this should be included in the provided avro-1.9.2.jar. The jar is correctly 
> picked up, as I do get “java.lang.ClassNotFoundException: 
> org.apache.avro.generic.IndexedRecord” if I remove that dependency.
>  
> […]
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.avro.SchemaBuilder
>                 at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)
>                 at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)
>                 at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)
>                 at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
>                 at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)
>                 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>                 at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>                 at java.lang.Thread.run(Thread.java:748)
>  
> The code I am using is the following:
>  
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> 
> jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar",
>         "flink-avro-1.11.2-sql-jar.jar",
>         "avro-1.9.2.jar"]
> 
> jar_base_path = "file:///path/to/my/jars/ <file:///path/to/my/jars/>"
> table_env.get_config().get_configuration().set_string('pipeline.jars',';'.join([jar_base_path
>  + j for j in jars]))
> 
> table_env.execute_sql("""
> CREATE TABLE test (
>     a STRING,
>     b INT,
>     c TIMESTAMP
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'my_topic',
> 'properties.bootstrap.servers' = 'kafka.broker.com:9192 
> <http://kafka.broker.com:9192/>',
> 'properties.group.id' = 'something',
> 'format' = 'avro'
> )
> """)
> 
> test = table_env.from_path('test')
> test.to_pandas()
>  
> Any help would be appreciated. 
>  
> Thanks in advance
> Thilo
> 
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, 
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 
> 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
> Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
> Pierre Dominique Prümm, Dr. Matthias Zieschang

  • PyFlink: Schneider, Thilo
    • Re: PyFlink: Dian Fu

Reply via email to