Hi Porritt, OK, it looks good.
Thanks, vino. 2018-07-17 23:13 GMT+08:00 Porritt, James <james.porr...@uk.mlp.com>: > I got to the bottom of this – it was a namespace issue. My schema was; > > { > > "type" : "record", > > "name" : "MyAvroSchema", > > "fields" : [ { > > "name" : "a", > > "type" : [ "null", "int" ] > > }, { > > "name" : "b", > > "type" : [ "null", "string" ] > > }] > > } > > But actually, I was putting the generated MyAvroSchema file into > ‘my_stats’ namespace (along with my other application code) by adding a > ‘package my_stats;’ line at the top. When I added “namespace”: “my_stats” > to the schema and generated the Java that way it was fine. > > > > *From:* Porritt, James <james.porr...@uk.mlp.com> > *Sent:* 17 July 2018 15:10 > *To:* 'vino yang' <yanghua1...@gmail.com> > *Cc:* user@flink.apache.org > *Subject:* RE: AvroInputFormat NullPointerException issues > > > > My MyAvroSchema class is as follows. It was generated using avro-tools: > > > > /** > > * Autogenerated by Avro > > * > > * DO NOT EDIT DIRECTLY > > */ > > > > import org.apache.avro.specific.SpecificData; > > import org.apache.avro.message.BinaryMessageEncoder; > > import org.apache.avro.message.BinaryMessageDecoder; > > import org.apache.avro.message.SchemaStore; > > > > @SuppressWarnings("all") > > @org.apache.avro.specific.AvroGenerated > > public class MyAvroSchema extends org.apache.avro.specific.SpecificRecordBase > implements org.apache.avro.specific.SpecificRecord { > > private static final long serialVersionUID = 4994916517880671663L; > > public static final org.apache.avro.Schema SCHEMA$ = new > org.apache.avro.Schema.Parser().parse("{\"type\":\"record\", > \"name\":\"MyAvroSchema\",\"fields\":[{\"name\":\"a\",\" > type\":[\"null\",\"int\"]},{\"name\":\"b\",\"type\":[\"null\ > ",\"string\"]}]}"); > > public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } > > > > private static SpecificData MODEL$ = new SpecificData(); > > > > private static final BinaryMessageEncoder<MyAvroSchema> ENCODER = > > new BinaryMessageEncoder<MyAvroSchema>(MODEL$, SCHEMA$); > > > > private static final BinaryMessageDecoder<MyAvroSchema> DECODER = > > new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$); > > > > /** > > * Return the BinaryMessageDecoder instance used by this class. > > */ > > public static BinaryMessageDecoder<MyAvroSchema> getDecoder() { > > return DECODER; > > } > > > > /** > > * Create a new BinaryMessageDecoder instance for this class that uses > the specified {@link SchemaStore}. > > * @param resolver a {@link SchemaStore} used to find schemas by > fingerprint > > */ > > public static BinaryMessageDecoder<MyAvroSchema> > createDecoder(SchemaStore resolver) { > > return new BinaryMessageDecoder<MyAvroSchema>(MODEL$, SCHEMA$, > resolver); > > } > > > > /** Serializes this MyAvroSchema to a ByteBuffer. */ > > public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { > > return ENCODER.encode(this); > > } > > > > /** Deserializes a MyAvroSchema from a ByteBuffer. */ > > public static MyAvroSchema fromByteBuffer( > > java.nio.ByteBuffer b) throws java.io.IOException { > > return DECODER.decode(b); > > } > > > > @Deprecated public java.lang.Integer a; > > @Deprecated public java.lang.CharSequence b; > > > > /** > > * Default constructor. Note that this does not initialize fields > > * to their default values from the schema. If that is desired then > > * one should use <code>newBuilder()</code>. > > */ > > public MyAvroSchema() {} > > > > /** > > * All-args constructor. > > * @param a The new value for a > > * @param b The new value for b > > */ > > public MyAvroSchema(java.lang.Integer a, java.lang.CharSequence b) { > > this.a = a; > > this.b = b; > > } > > > > public org.apache.avro.Schema getSchema() { return SCHEMA$; } > > // Used by DatumWriter. Applications should not call. > > public java.lang.Object get(int field$) { > > switch (field$) { > > case 0: return a; > > case 1: return b; > > default: throw new org.apache.avro.AvroRuntimeException("Bad index"); > > } > > } > > > > // Used by DatumReader. Applications should not call. > > @SuppressWarnings(value="unchecked") > > public void put(int field$, java.lang.Object value$) { > > switch (field$) { > > case 0: a = (java.lang.Integer)value$; break; > > case 1: b = (java.lang.CharSequence)value$; break; > > default: throw new org.apache.avro.AvroRuntimeException("Bad index"); > > } > > } > > > > /** > > * Gets the value of the 'a' field. > > * @return The value of the 'a' field. > > */ > > public java.lang.Integer getA() { > > return a; > > } > > > > /** > > * Sets the value of the 'a' field. > > * @param value the value to set. > > */ > > public void setA(java.lang.Integer value) { > > this.a = value; > > } > > > > /** > > * Gets the value of the 'b' field. > > * @return The value of the 'b' field. > > */ > > public java.lang.CharSequence getB() { > > return b; > > } > > > > /** > > * Sets the value of the 'b' field. > > * @param value the value to set. > > */ > > public void setB(java.lang.CharSequence value) { > > this.b = value; > > } > > > > /** > > * Creates a new MyAvroSchema RecordBuilder. > > * @return A new MyAvroSchema RecordBuilder > > */ > > public static MyAvroSchema.Builder newBuilder() { > > return new MyAvroSchema.Builder(); > > } > > > > /** > > * Creates a new MyAvroSchema RecordBuilder by copying an existing > Builder. > > * @param other The existing builder to copy. > > * @return A new MyAvroSchema RecordBuilder > > */ > > public static MyAvroSchema.Builder newBuilder(MyAvroSchema.Builder > other) { > > return new MyAvroSchema.Builder(other); > > } > > > > /** > > * Creates a new MyAvroSchema RecordBuilder by copying an existing > MyAvroSchema instance. > > * @param other The existing instance to copy. > > * @return A new MyAvroSchema RecordBuilder > > */ > > public static MyAvroSchema.Builder newBuilder(MyAvroSchema other) { > > return new MyAvroSchema.Builder(other); > > } > > > > /** > > * RecordBuilder for MyAvroSchema instances. > > */ > > public static class Builder extends org.apache.avro.specific. > SpecificRecordBuilderBase<MyAvroSchema> > > implements org.apache.avro.data.RecordBuilder<MyAvroSchema> { > > > > private java.lang.Integer a; > > private java.lang.CharSequence b; > > > > /** Creates a new Builder */ > > private Builder() { > > super(SCHEMA$); > > } > > > > /** > > * Creates a Builder by copying an existing Builder. > > * @param other The existing Builder to copy. > > */ > > private Builder(MyAvroSchema.Builder other) { > > super(other); > > if (isValidValue(fields()[0], other.a)) { > > this.a = data().deepCopy(fields()[0].schema(), other.a); > > fieldSetFlags()[0] = true; > > } > > if (isValidValue(fields()[1], other.b)) { > > this.b = data().deepCopy(fields()[1].schema(), other.b); > > fieldSetFlags()[1] = true; > > } > > } > > > > /** > > * Creates a Builder by copying an existing MyAvroSchema instance > > * @param other The existing instance to copy. > > */ > > private Builder(MyAvroSchema other) { > > super(SCHEMA$); > > if (isValidValue(fields()[0], other.a)) { > > this.a = data().deepCopy(fields()[0].schema(), other.a); > > fieldSetFlags()[0] = true; > > } > > if (isValidValue(fields()[1], other.b)) { > > this.b = data().deepCopy(fields()[1].schema(), other.b); > > fieldSetFlags()[1] = true; > > } > > } > > > > /** > > * Gets the value of the 'a' field. > > * @return The value. > > */ > > public java.lang.Integer getA() { > > return a; > > } > > > > /** > > * Sets the value of the 'a' field. > > * @param value The value of 'a'. > > * @return This builder. > > */ > > public MyAvroSchema.Builder setA(java.lang.Integer value) { > > validate(fields()[0], value); > > this.a = value; > > fieldSetFlags()[0] = true; > > return this; > > } > > > > /** > > * Checks whether the 'a' field has been set. > > * @return True if the 'a' field has been set, false otherwise. > > */ > > public boolean hasA() { > > return fieldSetFlags()[0]; > > } > > > > > > /** > > * Clears the value of the 'a' field. > > * @return This builder. > > */ > > public MyAvroSchema.Builder clearA() { > > a = null; > > fieldSetFlags()[0] = false; > > return this; > > } > > > > /** > > * Gets the value of the 'b' field. > > * @return The value. > > */ > > public java.lang.CharSequence getB() { > > return b; > > } > > > > /** > > * Sets the value of the 'b' field. > > * @param value The value of 'b'. > > * @return This builder. > > */ > > public MyAvroSchema.Builder setB(java.lang.CharSequence value) { > > validate(fields()[1], value); > > this.b = value; > > fieldSetFlags()[1] = true; > > return this; > > } > > > > /** > > * Checks whether the 'b' field has been set. > > * @return True if the 'b' field has been set, false otherwise. > > */ > > public boolean hasB() { > > return fieldSetFlags()[1]; > > } > > > > > > /** > > * Clears the value of the 'b' field. > > * @return This builder. > > */ > > public MyAvroSchema.Builder clearB() { > > b = null; > > fieldSetFlags()[1] = false; > > return this; > > } > > > > @Override > > @SuppressWarnings("unchecked") > > public MyAvroSchema build() { > > try { > > MyAvroSchema record = new MyAvroSchema(); > > record.a = fieldSetFlags()[0] ? this.a : (java.lang.Integer) > defaultValue(fields()[0]); > > record.b = fieldSetFlags()[1] ? this.b : (java.lang.CharSequence) > defaultValue(fields()[1]); > > return record; > > } catch (java.lang.Exception e) { > > throw new org.apache.avro.AvroRuntimeException(e); > > } > > } > > } > > > > @SuppressWarnings("unchecked") > > private static final org.apache.avro.io.DatumWriter<MyAvroSchema> > > WRITER$ = (org.apache.avro.io.DatumWriter<MyAvroSchema>) > MODEL$.createDatumWriter(SCHEMA$); > > > > @Override public void writeExternal(java.io.ObjectOutput out) > > throws java.io.IOException { > > WRITER$.write(this, SpecificData.getEncoder(out)); > > } > > > > @SuppressWarnings("unchecked") > > private static final org.apache.avro.io.DatumReader<MyAvroSchema> > > READER$ = (org.apache.avro.io.DatumReader<MyAvroSchema>) > MODEL$.createDatumReader(SCHEMA$); > > > > @Override public void readExternal(java.io.ObjectInput in) > > throws java.io.IOException { > > READER$.read(this, SpecificData.getDecoder(in)); > > } > > > > } > > > > I will check out the other suggestions you make. One concern I have is > that from the stacktrace I posted it doesn’t actually look like the custom > class is being called. > > > > *From:* vino yang <yanghua1...@gmail.com> > *Sent:* 17 July 2018 05:49 > *To:* Porritt, James <james.porr...@uk.mlp.com> > *Cc:* user@flink.apache.org > *Subject:* Re: AvroInputFormat NullPointerException issues > > > > Hi Porritt, > > > > Based on the exception stack trace you provided, it seems the exception > occurs when initializing Avro schema. You did not give the definition of > the MyAvroSchema Class, so I'd to suggest you : > > > > 1. make sure the file path "file:///home/myuser/test.avro" exists in your > tm node which run your source task. > > 2. here is the flink-avro connector documentation[1] you can refer > > 3. there are many test case such as this[2], you can use to test your > program, just need a little change > > > > [1]: https://ci.apache.org/projects/flink/flink-docs- > release-1.5/dev/batch/connectors.html#avro-support-in-flink > > [2]: https://github.com/apache/flink/blob/master/ > flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/ > AvroInputFormatTypeExtractionTest.java > > > > --- > > thanks. > > vino. > > > > 2018-07-16 20:22 GMT+08:00 Porritt, James <james.porr...@uk.mlp.com>: > > I’ve been trying to use the following code: > > > > ExecutionEnvironment env = ExecutionEnvironment. > getExecutionEnvironment(); > > Path path = new Path("file:///home/myuser/test.avro"); > > AvroInputFormat<MyAvroSchema> my_format = new > AvroInputFormat<>(path, MyAvroSchema.class); > > DataSet<MyAvroSchema> my_input = env.createInput(my_format); > > my_input.print(); > > env.execute(); > > > > to utilise this avro schema: > > > > { > > "type" : "record", > > "name" : "MyAvroSchema", > > "fields" : [ { > > "name" : "a", > > "type" : [ "null", "int" ] > > }, { > > "name" : "b", > > "type" : [ "null", "string" ] > > }] > > } > > > > I created the MyAvroSchema class from this schema using avro tools. I also > converted the following JSON into a compatible avro stored in > file:///home/myuser/test.avro > > > > {"a":{"int":123},"b":{"string":"hello"}} > > > > When I try and run this however I get: > > > > 2018-07-16 12:59:26,761 INFO org.apache.flink.runtime. > executiongraph.ExecutionGraph - DataSource (at > createInput(ExecutionEnvironment.java:548) > (org.apache.flink.formats.avro.AvroInputFormat) > > ) (1/1) (302878b522f420f6b7866de4f32fcbd6) switched from RUNNING to > FAILED. > > org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common. > util.concurrent.UncheckedExecutionException: > java.lang.NullPointerException > > at org.apache.avro.specific.SpecificData.getSchema( > SpecificData.java:227) > > at org.apache.avro.specific.SpecificDatumReader.<init>( > SpecificDatumReader.java:37) > > at org.apache.flink.formats.avro.AvroInputFormat.initReader( > AvroInputFormat.java:122) > > at org.apache.flink.formats.avro.AvroInputFormat.open( > AvroInputFormat.java:111) > > at org.apache.flink.formats.avro.AvroInputFormat.open( > AvroInputFormat.java:54) > > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:170) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: > avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.NullPointerException > > at avro.shaded.com.google.common.cache.LocalCache$Segment.get( > LocalCache.java:2234) > > at avro.shaded.com.google.common.cache.LocalCache.get( > LocalCache.java:3965) > > at avro.shaded.com.google.common.cache.LocalCache.getOrLoad( > LocalCache.java:3969) > > at avro.shaded.com.google.common.cache.LocalCache$ > LocalManualCache.get(LocalCache.java:4829) > > at org.apache.avro.specific.SpecificData.getSchema( > SpecificData.java:225) > > ... 7 more > > Caused by: java.lang.NullPointerException > > at java.lang.String.replace(String.java:2239) > > at org.apache.avro.specific.SpecificData.createSchema( > SpecificData.java:281) > > at org.apache.avro.specific.SpecificData$2.load( > SpecificData.java:218) > > at org.apache.avro.specific.SpecificData$2.load( > SpecificData.java:215) > > at avro.shaded.com.google.common.cache.LocalCache$ > LoadingValueReference.loadFuture(LocalCache.java:3568) > > at avro.shaded.com.google.common.cache.LocalCache$Segment. > loadSync(LocalCache.java:2350) > > at avro.shaded.com.google.common.cache.LocalCache$Segment. > lockedGetOrLoad(LocalCache.java:2313) > > at avro.shaded.com.google.common.cache.LocalCache$Segment.get( > LocalCache.java:2228) > > ... 11 more > > > > Can anyone suggest what might be causing the NullPointerException? I’m > using flink-1.5.0 and avro-tools-1.8.2 > > > > ###################################################################### > > The information contained in this communication is confidential and > > intended only for the individual(s) named above. If you are not a named > > addressee, please notify the sender immediately and delete this email > > from your system and do not disclose the email or any part of it to any > > person. The views expressed in this email are the views of the author > > and do not necessarily represent the views of Millennium Capital Partners > > LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic > > communications of MCP LLP and its affiliates, including telephone > > communications, may be electronically archived and subject to review > > and/or disclosure to someone other than the recipient. MCP LLP is > > authorized and regulated by the Financial Conduct Authority. Millennium > > Capital Partners LLP is a limited liability partnership registered in > > England & Wales with number OC312897 and with its registered office at > > 50 Berkeley Street, London, W1J 8HD > <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g> > . > > ###################################################################### > > > > ###################################################################### > > The information contained in this communication is confidential and > > intended only for the individual(s) named above. If you are not a named > > addressee, please notify the sender immediately and delete this email > > from your system and do not disclose the email or any part of it to any > > person. The views expressed in this email are the views of the author > > and do not necessarily represent the views of Millennium Capital Partners > > LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic > > communications of MCP LLP and its affiliates, including telephone > > communications, may be electronically archived and subject to review > > and/or disclosure to someone other than the recipient. MCP LLP is > > authorized and regulated by the Financial Conduct Authority. Millennium > > Capital Partners LLP is a limited liability partnership registered in > > England & Wales with number OC312897 and with its registered office at > > 50 Berkeley Street, London, W1J 8HD > <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g> > . > > ###################################################################### > > > ###################################################################### > The information contained in this communication is confidential and > intended only for the individual(s) named above. If you are not a named > addressee, please notify the sender immediately and delete this email > from your system and do not disclose the email or any part of it to any > person. The views expressed in this email are the views of the author > and do not necessarily represent the views of Millennium Capital Partners > LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic > communications of MCP LLP and its affiliates, including telephone > communications, may be electronically archived and subject to review > and/or disclosure to someone other than the recipient. MCP LLP is > authorized and regulated by the Financial Conduct Authority. Millennium > Capital Partners LLP is a limited liability partnership registered in > England & Wales with number OC312897 and with its registered office at > 50 Berkeley Street, London, W1J 8HD > <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g> > . > ###################################################################### > >