Dear list,

I’m having my first go at using Flink and quickly stumbled over a problem I 
find no easy way around. I hope you may help me.

I try to read an avro encoded kafka topic. Doing so, I do get a 
NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but 
this should be included in the provided avro-1.9.2.jar. The jar is correctly 
picked up, as I do get “java.lang.ClassNotFoundException: 
org.apache.avro.generic.IndexedRecord” if I remove that dependency.

[…]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.avro.SchemaBuilder
                at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)
                at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)
                at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)
                at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)
                at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
                at java.lang.Thread.run(Thread.java:748)

The code I am using is the following:

env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar",
        "flink-avro-1.11.2-sql-jar.jar",
        "avro-1.9.2.jar"]

jar_base_path = "file:///path/to/my/jars/"
table_env.get_config().get_configuration().set_string('pipeline.jars', 
';'.join([jar_base_path + j for j in jars]))

table_env.execute_sql("""
CREATE TABLE test (
    a STRING,
    b INT,
    c TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'kafka.broker.com:9192',
'properties.group.id' = 'something',
'format' = 'avro'
)
""")

test = table_env.from_path('test')
test.to_pandas()

Any help would be appreciated.

Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang

Reply via email to