Hi Eddie, I have tried your program with the following changes and it could execute successfully: - Replace `rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”` with rf`"file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”` <file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar%E2%80%9D%60> - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3
For your problem, I suspect if `file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` <file:///%7Bos.getcwd()%7D/lib/flink-sql-avro-1.12.2.jar%60> really exists. Could you double check that? [1] https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar [2] https://issues.apache.org/jira/browse/FLINK-21012 <https://issues.apache.org/jira/browse/FLINK-21012> Regards, Dian > 2021年4月25日 下午11:56,Edward Yang <eddiepy...@gmail.com> 写道: > > Hi Dian, > > I tried your suggestion but had the same error message unfortunately. I also > tried file:/ and file:// with the same error, not sure what's going on, I > assume writing to avro works fine in java and scala? > > Eddie > > On Sat, Apr 24, 2021 at 10:03 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar <>. > Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? > > Regards, > Dian > >> 2021年4月24日 上午8:29,Edward Yang <eddiepy...@gmail.com >> <mailto:eddiepy...@gmail.com>> 写道: >> >> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, >> I've tested my code with an iterator writing to csv and everything works as >> expected. Reading through the flink documentation I see that I should add >> jar dependencies to work with avro. I downloaded three jar files that I >> believe are required for avro like so: >> >> table_env\ >> .get_config()\ >> .get_configuration()\ >> .set_string( >> "pipeline.jars", >> >> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar >> <>" >> ) >> >> I suspect I'm not loading the jar files correctly, but it's unclear what I'm >> supposed to do as I'm not familiar with java and when I switch the sink >> format to avro I get some unexpected errors: >> Py4JJavaError: An error occurred while calling o746.executeInsert. >> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter >> at >> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71) >> at >> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61) >> at >> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373) >> at >> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365) >> at >> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163) >> at >> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139) >> at >> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134) >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95) >> at >> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87) >> at >> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) >> at >> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42) >> at >> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86) >> at >> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) >> at >> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.base/java.lang.Thread.run(Thread.java:834) >> Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter >> at >> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) >> at >> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >> >> My sample code as follows: >> >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, >> EnvironmentSettings >> >> env_settings = >> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() >> table_env = BatchTableEnvironment.create(environment_settings=env_settings) >> >> table_env\ >> .get_config()\ >> .get_configuration()\ >> .set_string( >> "pipeline.jars", >> >> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar >> <>" >> ) >> >> table = table_env.from_elements( >> a, >> schema=DataTypes.ROW([ >> DataTypes.FIELD('text', DataTypes.STRING()), >> DataTypes.FIELD('text1', DataTypes.STRING()) >> ]) >> ) >> sink_ddl = f""" >> create table Results( >> a STRING, >> b STRING >> ) with ( >> 'connector' = 'filesystem', >> 'path' = '{result_path}', >> 'format' = 'avro' >> ) >> """ >> >> table_env.execute_sql(sink_ddl) >> table.execute_insert("Results").wait() >> >> Could someone help or point me in the right direction to look? >