[
https://issues.apache.org/jira/browse/AVRO-2247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682264#comment-16682264
]
ASF GitHub Bot commented on AVRO-2247:
--------------------------------------
unchuckable closed pull request #354: AVRO-2247 - improved java reading
performance with new reader
URL: https://github.com/apache/avro/pull/354
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
index ba538d2fb..093d01154 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
@@ -49,6 +49,7 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.fastreader.FastReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.util.Utf8;
@@ -70,6 +71,9 @@
public static final String STRING_PROP = "avro.java.string";
protected static final String STRING_TYPE_STRING = "String";
+ private boolean fastReaderEnabled = Boolean.parseBoolean(
System.getProperty("org.apache.avro.fastread", "false" ) );
+ private ThreadLocal<FastReader> fastReader = ThreadLocal.withInitial(
()->new FastReader( this ) );
+
private final ClassLoader classLoader;
/** Set the Java type to be used when reading this schema. Meaningful only
@@ -99,6 +103,18 @@ public GenericData(ClassLoader classLoader) {
/** Return the class loader that's used (by subclasses). */
public ClassLoader getClassLoader() { return classLoader; }
+ public void setFastReaderEnabled( boolean enabled ) {
+ this.fastReaderEnabled = enabled;
+ }
+
+ public boolean isFastReaderEnabled() {
+ return fastReaderEnabled;
+ }
+
+ public FastReader getFastReader() {
+ return this.fastReader.get();
+ }
+
private Map<String, Conversion<?>> conversions =
new HashMap<>();
@@ -420,12 +436,12 @@ public int compareTo(GenericEnumSymbol that) {
/** Returns a {@link DatumReader} for this kind of data. */
public DatumReader createDatumReader(Schema schema) {
- return new GenericDatumReader(schema, schema, this);
+ return createDatumReader( schema, schema );
}
/** Returns a {@link DatumReader} for this kind of data. */
public DatumReader createDatumReader(Schema writer, Schema reader) {
- return new GenericDatumReader(writer, reader, this);
+ return new GenericDatumReader( writer, reader, this );
}
/** Returns a {@link DatumWriter} for this kind of data. */
@@ -1097,7 +1113,7 @@ private Object deepCopyRaw(Schema schema, Object value) {
Map<CharSequence, Object> mapCopy =
new HashMap<>(mapValue.size());
for (Map.Entry<CharSequence, Object> entry : mapValue.entrySet()) {
- mapCopy.put((CharSequence)(deepCopy(STRINGS, entry.getKey())),
+ mapCopy.put((deepCopy(STRINGS, entry.getKey())),
deepCopy(schema.getValueType(), entry.getValue()));
}
return mapCopy;
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
index 9b7b04cd9..0a513e411 100644
---
a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
+++
b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
@@ -18,14 +18,13 @@
package org.apache.avro.generic;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
@@ -36,6 +35,7 @@
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.io.fastreader.FastReader;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.WeakIdentityHashMap;
@@ -45,6 +45,9 @@
private Schema actual;
private Schema expected;
+ private DatumReader<D> fastDatumReader = null;
+ private FastReader creatorFastReader = null;
+
private ResolvingDecoder creatorResolver = null;
private final Thread creator;
@@ -86,6 +89,7 @@ public void setSchema(Schema writer) {
expected = actual;
}
creatorResolver = null;
+ fastDatumReader = null;
}
/** Get the reader's schema. */
@@ -95,11 +99,13 @@ public void setSchema(Schema writer) {
public void setExpected(Schema reader) {
this.expected = reader;
creatorResolver = null;
+ fastDatumReader = null;
}
private static final ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>
RESOLVER_CACHE =
new ThreadLocal<Map<Schema,Map<Schema,ResolvingDecoder>>>() {
+ @Override
protected Map<Schema,Map<Schema,ResolvingDecoder>> initialValue() {
return new WeakIdentityHashMap<>();
}
@@ -140,11 +146,37 @@ protected final ResolvingDecoder getResolver(Schema
actual, Schema expected)
@Override
@SuppressWarnings("unchecked")
public D read(D reuse, Decoder in) throws IOException {
- ResolvingDecoder resolver = getResolver(actual, expected);
- resolver.configure(in);
- D result = (D) read(reuse, expected, resolver);
- resolver.drain();
- return result;
+ if ( data.isFastReaderEnabled() ) {
+ return (D) getFastDatumReader().read(reuse, in);
+ }
+ else {
+ ResolvingDecoder resolver = getResolver(actual, expected);
+ resolver.configure(in);
+ D result = (D) read(reuse, expected, resolver);
+ resolver.drain();
+ return result;
+ }
+ }
+
+
+ private DatumReader<D> getFastDatumReader() {
+ if ( fastDatumReader == null ) {
+ fastDatumReader = getFastReader().createDatumReader( actual, expected );
+ }
+ return fastDatumReader;
+ }
+
+ private FastReader getFastReader() {
+ Thread currThread = Thread.currentThread();
+ if (currThread == creator && creatorFastReader != null) {
+ return creatorFastReader;
+ }
+
+ FastReader reader = data.getFastReader();
+ if ( currThread == creator ) {
+ this.creatorFastReader = reader;
+ }
+ return reader;
}
/** Called to read data.*/
@@ -536,7 +568,7 @@ public static void skip(Schema schema, Decoder in) throws
IOException {
}
break;
case UNION:
- skip(schema.getTypes().get((int)in.readIndex()), in);
+ skip(schema.getTypes().get(in.readIndex()), in);
break;
case FIXED:
in.skipFixed(schema.getFixedSize());
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/FastReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/FastReader.java
new file mode 100644
index 000000000..1dd1ab6a4
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/FastReader.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.avro.AvroMissingFieldException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.fastreader.readers.ArrayReader;
+import org.apache.avro.io.fastreader.readers.BooleanReader;
+import org.apache.avro.io.fastreader.readers.BytesReader;
+import org.apache.avro.io.fastreader.readers.DoubleReader;
+import org.apache.avro.io.fastreader.readers.EnumReader;
+import org.apache.avro.io.fastreader.readers.FailureReader;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+import org.apache.avro.io.fastreader.readers.FixedReader;
+import org.apache.avro.io.fastreader.readers.FloatReader;
+import org.apache.avro.io.fastreader.readers.IntegerReader;
+import org.apache.avro.io.fastreader.readers.LongReader;
+import org.apache.avro.io.fastreader.readers.MapReader;
+import org.apache.avro.io.fastreader.readers.NullReader;
+import org.apache.avro.io.fastreader.readers.ReconfigurableReader;
+import org.apache.avro.io.fastreader.readers.RecordReader;
+import org.apache.avro.io.fastreader.readers.RecordReader.Stage;
+import org.apache.avro.io.fastreader.readers.StringReader;
+import org.apache.avro.io.fastreader.readers.UnionReader;
+import org.apache.avro.io.fastreader.readers.Utf8Reader;
+import org.apache.avro.io.fastreader.readers.conversion.BytesConversionReader;
+import org.apache.avro.io.fastreader.readers.conversion.FixedConversionReader;
+import
org.apache.avro.io.fastreader.readers.conversion.GenericConversionReader;
+import
org.apache.avro.io.fastreader.readers.conversion.IntegerConversionReader;
+import org.apache.avro.io.fastreader.readers.conversion.LongConversionReader;
+import org.apache.avro.io.fastreader.readers.conversion.StringConversionReader;
+import org.apache.avro.io.fastreader.readers.promotion.DoublePromotionReader;
+import org.apache.avro.io.fastreader.readers.promotion.FloatPromotionReader;
+import org.apache.avro.io.fastreader.readers.promotion.LongPromotionReader;
+import org.apache.avro.io.fastreader.steps.ExecutionStep;
+import org.apache.avro.io.fastreader.steps.FieldDefaulter;
+import org.apache.avro.io.fastreader.steps.FieldSetter;
+import org.apache.avro.io.fastreader.steps.FieldSkipper;
+import org.apache.avro.io.fastreader.utils.ReflectionUtils;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.WeakIdentityHashMap;
+
+public class FastReader {
+
+ private static final ExecutionStep[] EMPTY_EXECUTION_STEPS = new
ExecutionStep[0];
+
+ /**
+ * Generic/SpecificData instance that contains basic functionalities like
instantiation of objects
+ */
+ private final GenericData data;
+
+ /** first schema is reader schema, second is writer schema */
+ private final Map<Schema, Map<Schema, RecordReader<?>>> readerCache = new
WeakIdentityHashMap<>();
+
+
+ /**
+ * if set to <b>true</b>, fail in reader/writer creation when an illegal
conversion state exists,
+ * if set to <b>false</b>, fail when such a state is encountered during
serialization or
+ * deserialization (in many cases, it might not even occur, e.g. in unions
or enums)
+ */
+ private boolean failFast = false;
+
+ private boolean keyClassEnabled = true;
+
+ private boolean classPropEnabled = true;
+
+ public static FastReader get() {
+ return new FastReader(GenericData.get());
+ }
+
+ public static FastReader getSpecific() {
+ return new FastReader(SpecificData.get());
+ }
+
+ public FastReader(GenericData parentData) {
+ this.data = parentData;
+ }
+
+ public FastReader withFailFast(boolean failFast) {
+ this.failFast = failFast;
+ return this;
+ }
+
+ public boolean isFailingFast() {
+ return this.failFast;
+ }
+
+ public FastReader withKeyClassEnabled(boolean enabled) {
+ this.keyClassEnabled = enabled;
+ return this;
+ }
+
+ public boolean isKeyClassEnabled() {
+ return this.keyClassEnabled;
+ }
+
+ public FastReader withClassPropEnabled(boolean enabled) {
+ this.classPropEnabled = enabled;
+ return this;
+ }
+
+ public boolean isClassPropEnabled() {
+ return this.classPropEnabled;
+ }
+
+ public <D> DatumReader<D> createReconfigurableDatumReader(Schema
writerSchema,
+ Schema readerSchema) {
+ return new ReconfigurableReader<>(this, readerSchema, writerSchema);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <D> DatumReader<D> createDatumReader(Schema writerSchema, Schema
readerSchema) {
+ return (DatumReader<D>) getReaderFor(readerSchema, writerSchema);
+ }
+
+ public <D extends IndexedRecord> RecordReader<D> createRecordReader(Schema
readerSchema,
+ Schema writerSchema) {
+ // record readers are created in a two-step process, first registering it,
then initializing it,
+ // to prevent endless loops on recursive types
+ RecordReader<D> recordReader = getRecordReaderFromCache(readerSchema,
writerSchema);
+
+ // only need to initialize once
+ if (recordReader.getInitializationStage() == Stage.NEW) {
+ initializeRecordReader(recordReader, readerSchema, writerSchema);
+ }
+
+ return recordReader;
+ }
+
+ private <D extends IndexedRecord> RecordReader<D> initializeRecordReader(
+ RecordReader<D> recordReader, Schema readerSchema, Schema writerSchema) {
+ try {
+ recordReader.startInitialization();
+
+ // generate supplier for the new object instances
+ Supplier<? extends IndexedRecord> supplier =
getObjectSupplier(readerSchema);
+ IntFunction<Conversion<?>> conversionSupplier =
getConversionSupplier(supplier.get());
+
+ ExecutionStep[] readSteps = new
ExecutionStep[writerSchema.getFields().size()];
+ List<ExecutionStep> defaultingSteps = new ArrayList<>();
+
+ // populate read steps with steps that read data that occurs in
readerSchema
+ for (Schema.Field readerField : readerSchema.getFields()) {
+ Schema.Field writerField = findSourceField(readerField, writerSchema);
+ if (writerField != null) {
+ // fields of reader schema that are found in writer schema will be
read via FieldReader
+ // instances
+ Conversion<?> conversion =
conversionSupplier.apply(readerField.pos());
+ FieldReader<?> fieldReader =
+ getReaderFor(readerField.schema(), writerField.schema(),
conversion);
+ int writerPos = writerField.pos();
+ if (readSteps[writerPos] != null) {
+ throw new IllegalStateException(
+ "Schemas not compatible. Two uses of writer field " +
writerField.name());
+ }
+ readSteps[writerPos] = new FieldSetter<>(readerField.pos(),
fieldReader);
+ } else {
+ defaultingSteps.add(getDefaultingStep(readerSchema, writerSchema,
readerField));
+ }
+ }
+
+ // fields of writer schema that are not being red from will be skipped
+ for (int i = 0; i < readSteps.length; i++) {
+ if (readSteps[i] == null) {
+ Schema fieldSchema = writerSchema.getFields().get(i).schema();
+ readSteps[i] = new FieldSkipper(getReaderFor(fieldSchema,
fieldSchema));
+ }
+ }
+
+ ExecutionStep[] defaultingStepsArray =
defaultingSteps.toArray(EMPTY_EXECUTION_STEPS);
+
+ // store execution plan in RecordReader
+ recordReader.finishInitialization(readSteps, defaultingStepsArray,
supplier);
+
+ return recordReader;
+ } catch (final Exception e) {
+ // if an exception is set, don't leave record reader in 'initializing'
state.
+ recordReader.reset();
+ throw e;
+ }
+ }
+
+
+ private ExecutionStep getDefaultingStep(Schema readerSchema, Schema
writerSchema, Schema.Field field) {
+ try {
+ Object defaultValue = data.getDefaultValue(field);
+ return new FieldDefaulter(field.pos(), defaultValue);
+ } catch (AvroMissingFieldException e) {
+ return getFailureReaderOrFail("Found " + writerSchema.getFullName() + ",
expecting " + readerSchema.getFullName() + ", missing required field " +
field.name() );
+ }
+ }
+
+
+ private IntFunction<Conversion<?>> getConversionSupplier(IndexedRecord
record) {
+ if (record instanceof SpecificRecordBase) {
+ return ((SpecificRecordBase) record)::getConversion;
+ } else {
+ return index -> null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <D extends IndexedRecord> RecordReader<D>
getRecordReaderFromCache(Schema readerSchema,
+ Schema writerSchema) {
+ synchronized ( readerCache ) {
+ Map<Schema, RecordReader<?>> writerMap =
+ readerCache.computeIfAbsent(readerSchema, k -> new
WeakIdentityHashMap<>());
+ return (RecordReader<D>) writerMap.computeIfAbsent(writerSchema, k ->
new RecordReader<>());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Supplier<? extends IndexedRecord> getObjectSupplier(Schema
readerSchema) {
+ final Object object = data.newRecord(null, readerSchema);
+ final Class<?> clazz = object.getClass();
+
+ if (IndexedRecord.class.isAssignableFrom(clazz)) {
+ Supplier<IndexedRecord> supplier;
+
+ if (SchemaConstructable.class.isAssignableFrom(clazz)) {
+ supplier =
ReflectionUtils.getOneArgConstructorAsSupplier((Class<IndexedRecord>) clazz,
+ Schema.class, readerSchema);
+ } else {
+ supplier =
ReflectionUtils.getConstructorAsSupplier((Class<IndexedRecord>) clazz);
+ }
+
+ if (supplier != null) {
+ // test supplier and check for schema match with created class
+ // (otherwise, if the SpecificRecord has a different schema, BAD
things will happen)
+ IndexedRecord sampleRecord = supplier.get();
+ if (sampleRecord.getSchema().equals(readerSchema)) {
+ return supplier;
+ }
+ }
+ }
+
+ return () -> new GenericData.Record(readerSchema);
+ }
+
+
+ private FieldReader<?> getReaderFor(Schema readerSchema, Schema
writerSchema) {
+ return getReaderFor(readerSchema, writerSchema, null);
+ }
+
+ private FieldReader<?> getReaderFor(Schema readerSchema, Schema writerSchema,
+ Conversion<?> explicitConversion) {
+ final FieldReader<?> baseReader = getReaderForBaseType(readerSchema,
writerSchema);
+ return applyConversions(readerSchema, baseReader, explicitConversion);
+ }
+
+ private FieldReader<?> applyConversions(Schema readerSchema, FieldReader<?>
reader,
+ Conversion<?> explicitConversion) {
+ Conversion<?> conversion = explicitConversion;
+
+ if (conversion == null) {
+ if (readerSchema.getLogicalType() == null) {
+ return reader;
+ }
+
+ conversion = data.getConversionFor(readerSchema.getLogicalType());
+ }
+
+ if (conversion == null) {
+ return reader;
+ }
+
+ switch (readerSchema.getType()) {
+ case LONG:
+ return new LongConversionReader<>(reader, conversion, readerSchema);
+ case INT:
+ return new IntegerConversionReader<>(reader, conversion, readerSchema);
+ case BYTES:
+ return new BytesConversionReader<>(reader, conversion, readerSchema);
+ case FIXED:
+ return new FixedConversionReader<>(reader, conversion, readerSchema);
+ default:
+ // use less optimized converter for all more exotic types
+ return new GenericConversionReader<>(reader, conversion, readerSchema);
+ }
+ }
+
+ private FieldReader<?> getReaderForBaseType(Schema readerSchema, Schema
writerSchema) {
+ if ( readerSchema.getType() != writerSchema.getType() ) {
+ return resolveMismatchedReader( readerSchema, writerSchema );
+ }
+
+ switch (readerSchema.getType()) {
+ case STRING:
+ return createStringReader(readerSchema, writerSchema);
+ case INT:
+ return IntegerReader.get();
+ case NULL:
+ return NullReader.get();
+ case RECORD:
+ return createRecordReader(readerSchema, writerSchema);
+ case DOUBLE:
+ return DoubleReader.get();
+ case UNION:
+ return createUnionReader(readerSchema, writerSchema);
+ case MAP:
+ return createMapReader(readerSchema, writerSchema);
+ case LONG:
+ return LongReader.get();
+ case ARRAY:
+ return createArrayReader(readerSchema, writerSchema);
+ case BOOLEAN:
+ return BooleanReader.get();
+ case BYTES:
+ return BytesReader.get();
+ case ENUM:
+ return createEnumReader(readerSchema, writerSchema);
+ case FIXED:
+ return createFixedReader(readerSchema, writerSchema);
+ case FLOAT:
+ return FloatReader.get();
+ default:
+ return getFailureReaderOrFail("Type " + readerSchema.getType() + " not
yet supported.");
+ }
+ }
+
+ private FieldReader<?> resolveMismatchedReader( Schema readerSchema, Schema
writerSchema ) {
+ if ( writerSchema.getType() == Type.UNION ) { // and reader isnt union type
+ Schema pseudoUnion = Schema.createUnion( readerSchema );
+ return getReaderFor( pseudoUnion, writerSchema );
+ }
+
+ switch ( readerSchema.getType() ) {
+ case UNION:
+ Schema compatibleReaderSchema = findUnionType( writerSchema,
readerSchema.getTypes() );
+ if ( compatibleReaderSchema != null ) {
+ return getReaderFor( compatibleReaderSchema, writerSchema );
+ }
+ break;
+
+ case LONG:
+ if ( writerSchema.getType() == Type.INT ) {
+ return new LongPromotionReader( IntegerReader.get() );
+ }
+ break;
+
+ case DOUBLE:
+ switch ( writerSchema.getType() ) {
+ case FLOAT: return new DoublePromotionReader(FloatReader.get());
+ case LONG: return new DoublePromotionReader(LongReader.get());
+ case INT: return new DoublePromotionReader(IntegerReader.get());
+ default:
+ }
+ break;
+
+ case FLOAT:
+ switch ( writerSchema.getType() ) {
+ case LONG: return new FloatPromotionReader(LongReader.get());
+ case INT: return new FloatPromotionReader(IntegerReader.get());
+ default:
+ }
+ break;
+
+ case STRING:
+ if ( writerSchema.getType() == Type.BYTES ) {
+ return createStringReader( readerSchema, readerSchema );
+ }
+ break;
+
+ case BYTES:
+ if ( writerSchema.getType() == Type.STRING ) {
+ return BytesReader.get();
+ }
+ break;
+
+ default:
+ }
+
+ return getSchemaMismatchError(readerSchema, writerSchema);
+}
+
+ private FieldReader<?> createStringReader(Schema readerSchema, Schema
writerSchema) {
+ FieldReader<?> stringReader = createSimpleStringReader(readerSchema);
+ if (isClassPropEnabled()) {
+ return
getTransformingStringReader(readerSchema.getProp(SpecificData.CLASS_PROP),
+ stringReader);
+ } else {
+ return stringReader;
+ }
+ }
+
+ private FieldReader<?> createSimpleStringReader(Schema readerSchema) {
+ String stringProperty = readerSchema.getProp(GenericData.STRING_PROP);
+ if (GenericData.StringType.String.name().equals(stringProperty)) {
+ return StringReader.get();
+ } else {
+ return Utf8Reader.get();
+ }
+ }
+
+ private FieldReader<?> createUnionReader(Schema readerSchema, Schema
writerSchema) {
+ List<Schema> readerTypes = readerSchema.getTypes();
+ List<Schema> writerTypes = writerSchema.getTypes();
+
+ FieldReader<?>[] unionReaders = new FieldReader[writerTypes.size()];
+
+ int i = 0;
+ for (Schema thisWriterSchema : writerTypes) {
+ Schema thisReaderSchema = findUnionType(thisWriterSchema, readerTypes);
+ if (thisReaderSchema == null) {
+ unionReaders[i++] = getFailureReaderOrFail("Found " +
thisWriterSchema.getType().getName().toLowerCase() + ", expecting " +
getUnionDescriptor(readerSchema));
+ } else {
+ unionReaders[i++] = getReaderFor(thisReaderSchema, thisWriterSchema);
+ }
+ }
+
+ return new UnionReader(unionReaders);
+ }
+
+
+ private String getUnionDescriptor(Schema schema) {
+ List<Schema> types = schema.getTypes();
+ if ( types.size() == 1 ) {
+ return schema.getTypes().get(0).getName().toLowerCase();
+ }
+
+ return "[" + schema.getTypes().stream().map( Schema::getName ).collect(
Collectors.joining(",") ) + "]";
+ }
+
+ private Schema findUnionType(Schema schema, List<Schema> readerTypes) {
+ // try for perfect match first
+ for ( Schema thisReaderSchema : readerTypes ) {
+ if ( thisReaderSchema.equals( schema ) ) {
+ return thisReaderSchema;
+ }
+ }
+
+ // then try for compatible
+ for (Schema thisReaderSchema : readerTypes) {
+ if ( areCompatible( thisReaderSchema, schema ) ) {
+ return thisReaderSchema;
+ }
+ }
+ return null;
+ }
+
+
+ private boolean areCompatible( Schema readerSchema, Schema writerSchema ) {
+ try {
+ FieldReader<?> reader = getReaderFor(readerSchema, writerSchema);
+ return !( reader instanceof FailureReader );
+ }
+ catch ( Exception e ) {
+ return false;
+ }
+ }
+
+ private Schema.Field findSourceField(Schema.Field field, Schema
writerSchema) {
+ Schema.Field sourceField = findSourceFieldByName(field.name(),
writerSchema);
+ if (sourceField == null) {
+ for (String thisAlias : emptyIfNull(field.aliases())) {
+ sourceField = findSourceFieldByName(thisAlias, writerSchema);
+ if (sourceField != null) {
+ break;
+ }
+ }
+ }
+ return sourceField;
+ }
+
+ private Schema.Field findSourceFieldByName(String fieldName, Schema
writerSchema) {
+ for (Schema.Field thisWriterField : writerSchema.getFields()) {
+ if (thisWriterField.name().equals(fieldName)) {
+ return thisWriterField;
+ }
+ for (String thisAlias : emptyIfNull(thisWriterField.aliases())) {
+ if (thisAlias.equals(fieldName)) {
+ return thisWriterField;
+ }
+ }
+ }
+ return null;
+ }
+
+ private FieldReader<?> createMapReader(Schema readerSchema, Schema
writerSchema) {
+ FieldReader<?> keyReader = createMapKeyReader(readerSchema);
+ FieldReader<?> valueReader =
+ getReaderFor(readerSchema.getValueType(), writerSchema.getValueType());
+ return new MapReader<>(keyReader, valueReader);
+ }
+
+
+ private FieldReader<?> createMapKeyReader(Schema readerSchema) {
+ FieldReader<?> stringReader = createSimpleStringReader(readerSchema);
+ if (isKeyClassEnabled()) {
+ return
getTransformingStringReader(readerSchema.getProp(SpecificData.KEY_CLASS_PROP),
+ createSimpleStringReader(readerSchema));
+ } else {
+ return stringReader;
+ }
+ }
+
+
+ private FieldReader<?> getTransformingStringReader(String valueClass,
+ FieldReader<?> stringReader) {
+ if (valueClass == null) {
+ return stringReader;
+ } else {
+ Function<String, ?> transformer = findClass(valueClass)
+ .map(clazz -> ReflectionUtils.getConstructorAsFunction(String.class,
clazz)).orElse(null);
+
+ if (transformer != null) {
+ return new StringConversionReader(transformer, StringReader.get());
+ }
+ }
+
+ return stringReader;
+ }
+
+
+ private Optional<Class<?>> findClass(String clazz) {
+ try {
+ return Optional.of(data.getClassLoader().loadClass(clazz));
+ } catch (ReflectiveOperationException e) {
+ return Optional.empty();
+ }
+ }
+
+ private FieldReader<?> createArrayReader(Schema readerSchema, Schema
writerSchema) {
+ FieldReader<?> fieldReader =
+ getReaderFor(readerSchema.getElementType(),
writerSchema.getElementType());
+ return ArrayReader.of(fieldReader, readerSchema);
+ }
+
+ private FieldReader<?> createEnumReader(Schema readerSchema, Schema
writerSchema) {
+ List<String> writerSymbols = writerSchema.getEnumSymbols();
+ List<String> readerSymbols = readerSchema.getEnumSymbols();
+ Object[] enumObjects = new Object[writerSymbols.size()];
+
+ // pre-get all possible instances of the enum and cache them
+ int i = 0;
+ for (String thisWriterSymbol : writerSymbols) {
+ if (readerSymbols.contains(thisWriterSymbol)) {
+ enumObjects[i] = data.createEnum(thisWriterSymbol, readerSchema);
+ } else if (isFailingFast()) {
+ fail("Enum reader does not contain writer's symbol " +
thisWriterSymbol);
+ }
+ i++;
+ }
+
+ return new EnumReader(enumObjects, writerSchema);
+ }
+
+ private FieldReader<?> createFixedReader(Schema readerSchema, Schema
writerSchema) {
+ if (readerSchema.getFixedSize() != writerSchema.getFixedSize()) {
+ return getFailureReaderOrFail("Reader and writer schemas do not match.
Fixed "
+ + readerSchema.getName() + " expects " + readerSchema.getFixedSize()
+ + " bytes, but writer is writing " + writerSchema.getFixedSize());
+ }
+
+ return new FixedReader(data, readerSchema);
+ }
+
+ private FailureReader getSchemaMismatchError(Schema readerSchema, Schema
writerSchema) {
+ return getFailureReaderOrFail( "Found " + writerSchema.getType() + ",
expecting " + readerSchema.getType() );
+ }
+
+ private FailureReader getFailureReaderOrFail(String message) {
+ if (failFast) {
+ throw new InvalidDataException(message);
+ } else {
+ return new FailureReader(message);
+ }
+ }
+
+ private void fail(String message) {
+ throw new InvalidDataException(message);
+ }
+
+ private static <T> Collection<T> emptyIfNull(Collection<T> collection) {
+ return (collection == null) ? Collections.emptyList() : collection;
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/InvalidDataException.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/InvalidDataException.java
new file mode 100644
index 000000000..a00729e2b
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/InvalidDataException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader;
+
+public class InvalidDataException extends RuntimeException {
+ private static final long serialVersionUID = 7701853368178317793L;
+
+ public InvalidDataException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidDataException(String message) {
+ super(message);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ArrayReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ArrayReader.java
new file mode 100644
index 000000000..b3ea9fec2
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ArrayReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.Decoder;
+
+public class ArrayReader<D> implements FieldReader<List<D>> {
+
+ private final FieldReader<D> elementReader;
+ private final Schema schema;
+
+ public static <D> ArrayReader<D> of( FieldReader<D> elementReader, Schema
schema ) {
+ // there is a big in ReusingArrayReader() that needs to be addressed.
until then, only use ArrayReader
+ // return elementReader.canReuse() ? new ReusingArrayReader<>(
elementReader, schema ) : new ArrayReader<>( elementReader, schema );
+ return new ArrayReader<>( elementReader, schema );
+ }
+
+ public ArrayReader(FieldReader<D> elementReader, Schema schema ) {
+ this.elementReader = elementReader;
+ this.schema = schema;
+ }
+
+ public FieldReader<D> getElementReader() {
+ return this.elementReader;
+ }
+
+ public Schema getSchema() {
+ return this.schema;
+ }
+
+ @Override
+ public List<D> read(List<D> reuse, Decoder decoder) throws IOException {
+ long l = decoder.readArrayStart();
+ List<D> array = new GenericData.Array<>( (int)l, getSchema() );
+ while (l > 0) {
+ for (long i = 0; i < l; i++) {
+ array.add(elementReader.read(null, decoder));
+ }
+ l = decoder.arrayNext();
+ }
+ return array;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long l = decoder.readArrayStart();
+ while (l > 0) {
+ for (long i = 0; i < l; i++) {
+ elementReader.skip(decoder);
+ }
+ l = decoder.arrayNext();
+ }
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BooleanReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BooleanReader.java
new file mode 100644
index 000000000..99ce85a16
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BooleanReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class BooleanReader implements FieldReader<Boolean> {
+
+ private static final BooleanReader INSTANCE = new BooleanReader();
+
+ public static BooleanReader get() {
+ return INSTANCE;
+ }
+
+ private BooleanReader() {}
+
+ @Override
+ public Boolean read(Boolean reuse, Decoder decoder) throws IOException {
+ return decoder.readBoolean();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readBoolean();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BytesReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BytesReader.java
new file mode 100644
index 000000000..d627aff1b
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/BytesReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.io.Decoder;
+
+public class BytesReader implements FieldReader<ByteBuffer> {
+
+ private static final BytesReader INSTANCE = new BytesReader();
+
+ public static BytesReader get() {
+ return INSTANCE;
+ }
+
+ private BytesReader() {}
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer read(ByteBuffer reuse, Decoder decoder) throws IOException
{
+ return decoder.readBytes(reuse);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipBytes();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/DoubleReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/DoubleReader.java
new file mode 100644
index 000000000..d011b0802
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/DoubleReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class DoubleReader implements FieldReader<Double> {
+
+ private static final DoubleReader INSTANCE = new DoubleReader();
+
+ public static DoubleReader get() {
+ return INSTANCE;
+ }
+
+ private DoubleReader() {}
+
+ @Override
+ public Double read(Double reuse, Decoder decoder) throws IOException {
+ return decoder.readDouble();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readDouble();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/EnumReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/EnumReader.java
new file mode 100644
index 000000000..e3890240a
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/EnumReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+
+public class EnumReader implements FieldReader<Object> {
+
+ private final Object[] enumObjects;
+ private final Schema writerSchema;
+
+ public EnumReader(Object[] enumObjects, Schema writerSchema) {
+ this.enumObjects = enumObjects;
+ this.writerSchema = writerSchema;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ int index = decoder.readEnum();
+ Object resultObject = enumObjects[index];
+
+ if (resultObject == null) {
+ throw new AvroTypeException( "No match for " +
writerSchema.getEnumSymbols().get(index) );
+ }
+
+ return resultObject;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readEnum();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FailureReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FailureReader.java
new file mode 100644
index 000000000..d795ea4bd
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FailureReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.steps.ExecutionStep;
+
+/**
+ * Reader for lazy errors, e.g. trying to read or skip from this will result
in an error being
+ * thrown. Used in schema conversion of unions with non-matching types.
+ */
+public class FailureReader implements FieldReader<Object>, ExecutionStep {
+
+ private final String errorMessage;
+
+ public FailureReader(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ throw new AvroTypeException(errorMessage);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ throw new AvroTypeException(errorMessage);
+ }
+
+ @Override
+ public void execute(IndexedRecord record, Decoder decoder) throws
IOException {
+ throw new AvroTypeException(errorMessage);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FieldReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FieldReader.java
new file mode 100644
index 000000000..ec6260fcb
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FieldReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+
+public interface FieldReader<D> extends DatumReader<D> {
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException;
+
+ public void skip(Decoder decoder) throws IOException;
+
+ public default boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ default void setSchema(Schema schema) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FixedReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FixedReader.java
new file mode 100644
index 000000000..8104bbd87
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FixedReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.io.Decoder;
+
+public class FixedReader implements FieldReader<Object> {
+
+ private final GenericData data;
+ private final Schema readerSchema;
+
+ public FixedReader(GenericData data, Schema readerSchema) {
+ this.data = data;
+ this.readerSchema = readerSchema;
+ }
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ GenericFixed fixed = (GenericFixed) data.createFixed(reuse, readerSchema);
+ decoder.readFixed(fixed.bytes(), 0, readerSchema.getFixedSize());
+ return fixed;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(readerSchema.getFixedSize());
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FloatReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FloatReader.java
new file mode 100644
index 000000000..902b8a24b
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/FloatReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class FloatReader implements FieldReader<Float> {
+
+ private static final FloatReader INSTANCE = new FloatReader();
+
+ public static FloatReader get() {
+ return INSTANCE;
+ }
+
+ private FloatReader() {}
+
+
+ @Override
+ public Float read(Float reuse, Decoder decoder) throws IOException {
+ return decoder.readFloat();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readFloat();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/IntegerReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/IntegerReader.java
new file mode 100644
index 000000000..fe8c73eb6
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/IntegerReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class IntegerReader implements FieldReader<Integer> {
+
+ private static final IntegerReader INSTANCE = new IntegerReader();
+
+ public static IntegerReader get() {
+ return INSTANCE;
+ }
+
+ private IntegerReader() {}
+
+ @Override
+ public Integer read(Integer reuse, Decoder decoder) throws IOException {
+ return decoder.readInt();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/LongReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/LongReader.java
new file mode 100644
index 000000000..fe72ff434
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/LongReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class LongReader implements FieldReader<Long> {
+
+ private static final LongReader INSTANCE = new LongReader();
+
+ public static LongReader get() {
+ return INSTANCE;
+ }
+
+ private LongReader() {}
+
+ @Override
+ public Long read(Long reuse, Decoder decoder) throws IOException {
+ return decoder.readLong();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readLong();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/MapReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/MapReader.java
new file mode 100644
index 000000000..6cd5d67e9
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/MapReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.io.Decoder;
+
+public class MapReader<K, V> implements FieldReader<Map<K, V>> {
+
+ private FieldReader<K> keyReader;
+ private FieldReader<V> valueReader;
+
+ public MapReader(FieldReader<K> keyReader, FieldReader<V> valueReader) {
+ this.keyReader = keyReader;
+ this.valueReader = valueReader;
+ }
+
+ @Override
+ public Map<K, V> read(Map<K, V> reuse, Decoder decoder) throws IOException {
+ long l = decoder.readMapStart();
+ Map<K, V> targetMap = new HashMap<>();
+
+ while (l > 0) {
+ for (int i = 0; i < l; i++) {
+ K key = keyReader.read(null, decoder);
+ V value = valueReader.read(null, decoder);
+ targetMap.put(key, value);
+ }
+ l = decoder.mapNext();
+ }
+
+ return targetMap;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long l = decoder.readMapStart();
+ while (l > 0) {
+ for (int i = 0; i < l; i++) {
+ keyReader.skip(decoder);
+ valueReader.skip(decoder);
+ }
+ l = decoder.mapNext();
+ }
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/NullReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/NullReader.java
new file mode 100644
index 000000000..61119930a
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/NullReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class NullReader implements FieldReader<Object> {
+
+ private static final NullReader INSTANCE = new NullReader();
+
+ public static NullReader get() {
+ return INSTANCE;
+ }
+
+ private NullReader() {}
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ decoder.readNull();
+ return null;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readNull();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReconfigurableReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReconfigurableReader.java
new file mode 100644
index 000000000..7be769391
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReconfigurableReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.FastReader;
+
+public class ReconfigurableReader<T> implements DatumReader<T> {
+
+ private final FastReader fastData;
+ private final Schema readerSchema;
+ private Schema writerSchema;
+ private FieldReader<T> reader;
+
+
+ public ReconfigurableReader(FastReader fastData, Schema readerSchema, Schema
writerSchema) {
+ this.fastData = fastData;
+ this.readerSchema = readerSchema;
+ setSchema(writerSchema);
+ }
+
+
+ @Override
+ public void setSchema(Schema schema) {
+ this.writerSchema = schema;
+ this.reader = null;
+ }
+
+ @Override
+ public T read(T reuse, Decoder in) throws IOException {
+ return getReader().read(reuse, in);
+ }
+
+ @SuppressWarnings("unchecked")
+ private FieldReader<T> getReader() {
+ if (reader == null) {
+ reader = (FieldReader<T>) fastData.createDatumReader(writerSchema,
readerSchema);
+ }
+ return reader;
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/RecordReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/RecordReader.java
new file mode 100644
index 000000000..c673fac00
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/RecordReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.steps.ExecutionStep;
+
+public class RecordReader<D extends IndexedRecord> implements FieldReader<D> {
+
+ public enum Stage {
+ NEW,
+ INITIALIZING,
+ INITIALIZED
+ }
+
+ private ExecutionStep[] readSteps;
+ private ExecutionStep[] defaultingSteps;
+
+ private Supplier<D> supplier;
+ private Stage stage = Stage.NEW;
+
+ public Stage getInitializationStage() {
+ return this.stage;
+ }
+
+ public void reset() {
+ this.stage = Stage.NEW;
+ }
+
+ public void startInitialization() {
+ this.stage = Stage.INITIALIZING;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void finishInitialization(
+ ExecutionStep[] readSteps,
+ ExecutionStep[] defaultingSteps,
+ Supplier<? extends IndexedRecord> newInstanceSupplier) {
+ this.readSteps = readSteps;
+ this.defaultingSteps = defaultingSteps;
+ this.supplier = (Supplier<D>) newInstanceSupplier;
+ this.stage = Stage.INITIALIZED;
+ }
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ D object = reuse != null ? reuse : supplier.get();
+ for (ExecutionStep thisStep : readSteps) {
+ thisStep.execute(object, decoder);
+ }
+ for (ExecutionStep thisStep : defaultingSteps) {
+ thisStep.execute(object, decoder);
+ }
+ return object;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ for (ExecutionStep thisStep : readSteps) {
+ thisStep.skip(decoder);
+ }
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReusingArrayReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReusingArrayReader.java
new file mode 100644
index 000000000..ce26163a2
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/ReusingArrayReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.Decoder;
+
+public class ReusingArrayReader<D> extends ArrayReader<D> {
+
+ public ReusingArrayReader(FieldReader<D> parentReader, Schema schema) {
+ super( parentReader, schema );
+ }
+
+ @Override
+ public List<D> read(List<D> reuse, Decoder decoder) throws IOException {
+ if ( reuse != null ) {
+ Iterator<D> reuseIterator = reuse.iterator();
+ GenericArray<D> reuseArray = (GenericArray<D>)reuse;
+ long l = decoder.readArrayStart();
+
+ GenericArray<D> newArray = new GenericData.Array<>( (int) l, getSchema()
);
+
+ while (l > 0) {
+ for (long i = 0; i < l; i++) {
+ newArray.add( getElementReader().read( getNextReusableElement(
reuseIterator ), decoder));
+ }
+ l = decoder.arrayNext();
+ }
+ return reuseArray;
+ }
+ else {
+ return super.read( reuse, decoder );
+ }
+ }
+
+ private D getNextReusableElement( Iterator<D> iterator ) {
+ if ( iterator.hasNext() ) {
+ return iterator.next();
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/StringReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/StringReader.java
new file mode 100644
index 000000000..7d3f9af71
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/StringReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class StringReader implements FieldReader<String> {
+
+ private static final StringReader INSTANCE = new StringReader();
+
+ public static StringReader get() {
+ return INSTANCE;
+ }
+
+ private StringReader() {}
+
+ @Override
+ public String read(String reuse, Decoder decoder) throws IOException {
+ return decoder.readString();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipString();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/UnionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/UnionReader.java
new file mode 100644
index 000000000..9422e870d
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/UnionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public class UnionReader implements FieldReader<Object> {
+
+ private FieldReader<?>[] unionReaders;
+
+ public UnionReader(FieldReader<?>[] readers) {
+ this.unionReaders = readers;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ final int selection = decoder.readIndex();
+ return unionReaders[selection].read(null, decoder);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ final int selection = decoder.readIndex();
+ unionReaders[selection].skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/Utf8Reader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/Utf8Reader.java
new file mode 100644
index 000000000..f40c7371e
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/Utf8Reader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+public class Utf8Reader implements FieldReader<Utf8> {
+
+ private static final Utf8Reader INSTANCE = new Utf8Reader();
+
+ public static Utf8Reader get() {
+ return INSTANCE;
+ }
+
+ private Utf8Reader() {}
+
+ @Override
+ public boolean canReuse() {
+ return true;
+ }
+
+ @Override
+ public Utf8 read(Utf8 reuse, Decoder decoder) throws IOException {
+ return decoder.readString(reuse);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipString();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/BytesConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/BytesConversionReader.java
new file mode 100644
index 000000000..4260c5f3e
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/BytesConversionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.Conversion;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class BytesConversionReader<D> implements FieldReader<D> {
+
+ private final FieldReader<ByteBuffer> parentReader;
+ private final Conversion<D> conversion;
+ private final Schema schema;
+
+ @SuppressWarnings("unchecked")
+ public BytesConversionReader(
+ FieldReader<?> parentReader, Conversion<?> conversion, Schema schema) {
+ this.parentReader = (FieldReader<ByteBuffer>) parentReader;
+ this.conversion = (Conversion<D>) conversion;
+ this.schema = schema;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ return conversion.fromBytes(parentReader.read(null, decoder), schema,
schema.getLogicalType());
+ }
+
+ @Override
+ public boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ parentReader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/FixedConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/FixedConversionReader.java
new file mode 100644
index 000000000..ef4c0f1c0
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/FixedConversionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class FixedConversionReader<D> implements FieldReader<D> {
+
+ private final FieldReader<Object> parentReader;
+ private final Conversion<D> conversion;
+ private final Schema schema;
+
+ @SuppressWarnings("unchecked")
+ public FixedConversionReader(
+ FieldReader<?> parentReader, Conversion<?> conversion, Schema schema) {
+ this.parentReader = (FieldReader<Object>) parentReader;
+ this.conversion = (Conversion<D>) conversion;
+ this.schema = schema;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ return conversion.fromFixed((GenericFixed)parentReader.read(null,
decoder), schema, schema.getLogicalType());
+ }
+
+ @Override
+ public boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ parentReader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/GenericConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/GenericConversionReader.java
new file mode 100644
index 000000000..0e0b41c74
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/GenericConversionReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class GenericConversionReader<D> implements FieldReader<D> {
+
+ private final FieldReader<?> parentReader;
+ private final Conversion<D> conversion;
+ private final Schema schema;
+
+ @SuppressWarnings("unchecked")
+ public GenericConversionReader(FieldReader<?> parentReader, Conversion<?>
conversion,
+ Schema schema) {
+ this.parentReader = parentReader;
+ this.conversion = (Conversion<D>) conversion;
+ this.schema = schema;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ final LogicalType logicalType = schema.getLogicalType();
+ final Object parentObject = parentReader.read(null, decoder);
+ switch (schema.getType()) {
+ case ARRAY:
+ return conversion.fromArray((Collection<?>) parentObject, schema,
logicalType);
+ case BOOLEAN:
+ return conversion.fromBoolean((Boolean) parentObject, schema,
logicalType);
+ case BYTES:
+ return conversion.fromBytes((ByteBuffer) parentObject, schema,
logicalType);
+ case DOUBLE:
+ return conversion.fromDouble((Double) parentObject, schema,
logicalType);
+ case ENUM:
+ return conversion.fromEnumSymbol((GenericEnumSymbol) parentObject,
schema, logicalType);
+ case FIXED:
+ return conversion.fromFixed((GenericFixed) parentObject, schema,
logicalType);
+ case FLOAT:
+ return conversion.fromFloat((Float) parentObject, schema, logicalType);
+ case INT:
+ return conversion.fromInt((Integer) parentObject, schema, logicalType);
+ case LONG:
+ return conversion.fromLong((Long) parentObject, schema, logicalType);
+ case MAP:
+ return conversion.fromMap((Map<?, ?>) parentObject, schema,
logicalType);
+ case RECORD:
+ return conversion.fromRecord((IndexedRecord) parentObject, schema,
logicalType);
+ case STRING:
+ return conversion.fromCharSequence((CharSequence) parentObject,
schema, logicalType);
+ case UNION:
+ case NULL:
+ default:
+ throw new IllegalArgumentException("Conversion not possible for type "
+ schema.getType());
+ }
+ }
+
+ @Override
+ public boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ parentReader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/IntegerConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/IntegerConversionReader.java
new file mode 100644
index 000000000..53e1111d8
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/IntegerConversionReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class IntegerConversionReader<D> implements FieldReader<D> {
+
+ private final FieldReader<Integer> parentReader;
+ private final Conversion<D> conversion;
+ private final Schema schema;
+
+ @SuppressWarnings("unchecked")
+ public IntegerConversionReader(
+ FieldReader<?> parentReader, Conversion<?> conversion, Schema schema) {
+ this.parentReader = (FieldReader<Integer>) parentReader;
+ this.conversion = (Conversion<D>) conversion;
+ this.schema = schema;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ return conversion.fromInt(parentReader.read(null, decoder), schema,
schema.getLogicalType());
+ }
+
+ @Override
+ public boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ parentReader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/LongConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/LongConversionReader.java
new file mode 100644
index 000000000..aebb1c4cb
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/LongConversionReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import org.apache.avro.Conversion;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class LongConversionReader<D> implements FieldReader<D> {
+
+ private final FieldReader<Long> parentReader;
+ private final Conversion<D> conversion;
+ private final Schema schema;
+
+ @SuppressWarnings("unchecked")
+ public LongConversionReader(
+ FieldReader<?> parentReader, Conversion<?> conversion, Schema schema) {
+ this.parentReader = (FieldReader<Long>) parentReader;
+ this.conversion = (Conversion<D>) conversion;
+ this.schema = schema;
+ }
+
+ @Override
+ public D read(D reuse, Decoder decoder) throws IOException {
+ return conversion.fromLong(parentReader.read(null, decoder), schema,
schema.getLogicalType());
+ }
+
+ @Override
+ public boolean canReuse() {
+ return false;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ parentReader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/StringConversionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/StringConversionReader.java
new file mode 100644
index 000000000..ecd080bce
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/conversion/StringConversionReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.conversion;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class StringConversionReader implements FieldReader<Object> {
+
+ private final Function<String, ?> conversionFunction;
+ private final FieldReader<?> stringReader;
+
+ public StringConversionReader(Function<String, ?> conversionFunction,
+ FieldReader<?> stringReader) {
+ this.conversionFunction = conversionFunction;
+ this.stringReader = stringReader;
+ }
+
+ @Override
+ public Object read(Object reuse, Decoder decoder) throws IOException {
+ return conversionFunction.apply( stringReader.read(null,
decoder).toString() );
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ stringReader.skip(decoder);
+ }
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/DoublePromotionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/DoublePromotionReader.java
new file mode 100644
index 000000000..a578bab75
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/DoublePromotionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.promotion;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class DoublePromotionReader implements FieldReader<Double> {
+
+ private final FieldReader<? extends Number> numberReader;
+
+ public DoublePromotionReader( FieldReader<? extends Number> numberReader ) {
+ this.numberReader = numberReader;
+ }
+
+ @Override
+ public Double read(Double reuse, Decoder decoder) throws IOException {
+ return numberReader.read( null, decoder ).doubleValue();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ numberReader.skip( decoder );
+ }
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/FloatPromotionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/FloatPromotionReader.java
new file mode 100644
index 000000000..05224a746
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/FloatPromotionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.promotion;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class FloatPromotionReader implements FieldReader<Float> {
+
+ private final FieldReader<? extends Number> numberReader;
+
+ public FloatPromotionReader( FieldReader<? extends Number> numberReader ) {
+ this.numberReader = numberReader;
+ }
+
+ @Override
+ public Float read(Float reuse, Decoder decoder) throws IOException {
+ return numberReader.read( null, decoder ).floatValue();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ numberReader.skip( decoder );
+ }
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/LongPromotionReader.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/LongPromotionReader.java
new file mode 100644
index 000000000..2abceb944
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/readers/promotion/LongPromotionReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.readers.promotion;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class LongPromotionReader implements FieldReader<Long> {
+
+ private final FieldReader<Integer> intReader;
+
+ public LongPromotionReader( FieldReader<Integer> intReader ) {
+ this.intReader = intReader;
+ }
+
+ @Override
+ public Long read(Long reuse, Decoder decoder) throws IOException {
+ return intReader.read( null, decoder ).longValue();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ intReader.skip( decoder );
+ }
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/ExecutionStep.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/ExecutionStep.java
new file mode 100644
index 000000000..a09867d7f
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/ExecutionStep.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.steps;
+
+import java.io.IOException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+
+public interface ExecutionStep {
+ public void execute(IndexedRecord record, Decoder decoder) throws
IOException;
+
+ public void skip(Decoder decoder) throws IOException;
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldDefaulter.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldDefaulter.java
new file mode 100644
index 000000000..ba3b9cec6
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldDefaulter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.steps;
+
+import java.io.IOException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+
+public class FieldDefaulter implements ExecutionStep {
+
+ private final int fieldIndex;
+ private final Object value;
+
+ public FieldDefaulter(int fieldIndex, Object value) {
+ this.fieldIndex = fieldIndex;
+ this.value = value;
+ }
+
+ @Override
+ public void execute(IndexedRecord record, Decoder decoder) throws
IOException {
+ record.put(fieldIndex, value);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ // if defaulting is skipped, nothing needs to be done
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSetter.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSetter.java
new file mode 100644
index 000000000..d7c3ef0a1
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSetter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.steps;
+
+import java.io.IOException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class FieldSetter<D> implements ExecutionStep {
+
+ private final int fieldNumber;
+ private final boolean reuse;
+ private final FieldReader<D> reader;
+
+ public FieldSetter(int fieldNumber, FieldReader<D> reader) {
+ this.fieldNumber = fieldNumber;
+ this.reader = reader;
+ this.reuse = reader.canReuse();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(IndexedRecord record, Decoder decoder) throws
IOException {
+ D reusableObject = reuse ? (D) record.get(fieldNumber) : null;
+ record.put(fieldNumber, reader.read(reusableObject, decoder));
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ reader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSkipper.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSkipper.java
new file mode 100644
index 000000000..410283972
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/steps/FieldSkipper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.steps;
+
+import java.io.IOException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.fastreader.readers.FieldReader;
+
+public class FieldSkipper implements ExecutionStep {
+
+ private final FieldReader<?> reader;
+
+ public FieldSkipper(FieldReader<?> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void execute(IndexedRecord record, Decoder decoder) throws
IOException {
+ skip(decoder);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ reader.skip(decoder);
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/LoggingDecoder.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/LoggingDecoder.java
new file mode 100644
index 000000000..7d1d700bb
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/LoggingDecoder.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingDecoder extends Decoder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoggingDecoder.class);
+
+ private final Decoder parentDecoder;
+
+ public LoggingDecoder(Decoder parentDecoder) {
+ this.parentDecoder = parentDecoder;
+ }
+
+ @Override
+ public void readNull() throws IOException {
+ LOGGER.info("readNull");
+ parentDecoder.readNull();
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ LOGGER.info("readBoolean");
+ return parentDecoder.readBoolean();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ LOGGER.info("readInt");
+ return parentDecoder.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ LOGGER.info("readLong");
+ return parentDecoder.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ LOGGER.info("readFloat");
+ return parentDecoder.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ LOGGER.info("readDouble");
+ return parentDecoder.readDouble();
+ }
+
+ @Override
+ public Utf8 readString(Utf8 old) throws IOException {
+ LOGGER.info("readString(UTF8)");
+ return parentDecoder.readString(old);
+ }
+
+ @Override
+ public String readString() throws IOException {
+ LOGGER.info("readString");
+ return parentDecoder.readString();
+ }
+
+ @Override
+ public void skipString() throws IOException {
+ LOGGER.info("skipString");
+ parentDecoder.skipString();
+ }
+
+ @Override
+ public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+ LOGGER.info("readBytes");
+ return parentDecoder.readBytes(old);
+ }
+
+ @Override
+ public void skipBytes() throws IOException {
+ LOGGER.info("skipBytes");
+ parentDecoder.skipBytes();
+ }
+
+ @Override
+ public void readFixed(byte[] bytes, int start, int length) throws
IOException {
+ LOGGER.info("readFixed(length={})", length);
+ parentDecoder.readFixed(bytes, start, length);
+ }
+
+ @Override
+ public void skipFixed(int length) throws IOException {
+ LOGGER.info("skipFixed(length={})", length);
+ parentDecoder.skipFixed(length);
+ }
+
+ @Override
+ public int readEnum() throws IOException {
+ LOGGER.info("readEnum");
+ return parentDecoder.readEnum();
+ }
+
+ @Override
+ public long readArrayStart() throws IOException {
+ LOGGER.info("readArrayStart");
+ return parentDecoder.readArrayStart();
+ }
+
+ @Override
+ public long arrayNext() throws IOException {
+ LOGGER.info("arrayNext");
+ return parentDecoder.arrayNext();
+ }
+
+ @Override
+ public long skipArray() throws IOException {
+ LOGGER.info("skipArray");
+ return parentDecoder.skipArray();
+ }
+
+ @Override
+ public long readMapStart() throws IOException {
+ LOGGER.info("readMapStart");
+ return parentDecoder.readMapStart();
+ }
+
+ @Override
+ public long mapNext() throws IOException {
+ LOGGER.info("mapNext");
+ return parentDecoder.mapNext();
+ }
+
+ @Override
+ public long skipMap() throws IOException {
+ LOGGER.info("skipMap");
+ return parentDecoder.skipMap();
+ }
+
+ @Override
+ public int readIndex() throws IOException {
+ LOGGER.info("readIndex");
+ return parentDecoder.readIndex();
+ }
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/ReflectionUtils.java
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/ReflectionUtils.java
new file mode 100644
index 000000000..8228b9cf9
--- /dev/null
+++
b/lang/java/avro/src/main/java/org/apache/avro/io/fastreader/utils/ReflectionUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io.fastreader.utils;
+
+import java.lang.invoke.CallSite;
+import java.lang.invoke.LambdaMetafactory;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class ReflectionUtils {
+
+ private ReflectionUtils() {
+ // static helper class, don't initiate
+ }
+
+ public static <D> Supplier<D> getConstructorAsSupplier( Class<D> clazz ) {
+ try {
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ MethodHandle constructorHandle = lookup.findConstructor( clazz,
MethodType.methodType(void.class) );
+
+ CallSite site = LambdaMetafactory.metafactory( lookup,
+ "get",
+ MethodType.methodType( Supplier.class ),
+ constructorHandle.type().generic(),
+ constructorHandle,
+ constructorHandle.type() );
+
+ return (Supplier<D>) site.getTarget().invokeExact();
+ }
+ catch ( Throwable t ) {
+ // if anything goes wrong, don't provide a Supplier
+ return null;
+ }
+ }
+
+
+ public static <V,R> Supplier<R> getOneArgConstructorAsSupplier( Class<R>
clazz, Class<V> argumentClass, V argument ) {
+ Function<V,R> supplierFunction = getConstructorAsFunction(
argumentClass, clazz );
+ if ( supplierFunction != null ) {
+ return () -> supplierFunction.apply( argument );
+ }
+ else {
+ return null;
+ }
+ }
+
+
+ public static <V,R> Function<V,R> getConstructorAsFunction( Class<V>
parameterClass, Class<R> clazz ) {
+ try {
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ MethodHandle constructorHandle = lookup.findConstructor( clazz,
MethodType.methodType(void.class, parameterClass ) );
+
+ CallSite site = LambdaMetafactory.metafactory( lookup,
+ "apply",
+ MethodType.methodType( Function.class ),
+ constructorHandle.type().generic(),
+ constructorHandle,
+ constructorHandle.type() );
+
+ return (Function<V,R>) site.getTarget().invokeExact();
+ }
+ catch ( Throwable t ) {
+ // if something goes wrong, do not provide a Function instance
+ return null;
+ }
+ }
+
+
+}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
index d26d1f9d8..60472a2ad 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
@@ -103,6 +103,11 @@ public ReflectData(ClassLoader classLoader) {
/** Return the singleton instance. */
public static ReflectData get() { return INSTANCE; }
+ @Override
+ public boolean isFastReaderEnabled() {
+ return false;
+ }
+
/** Cause a class to be treated as though it had an {@link Stringable}
** annotation. */
public ReflectData addStringable(Class c) {
@@ -365,6 +370,7 @@ static Class getClassProp(Schema schema, String prop) {
* It returns false for non-string-maps because Avro writes out such maps
* as an array of records. Even their JSON representation is an array.
*/
+ @Override
protected boolean isMap(Object datum) {
return (datum instanceof Map) && !isNonStringMap(datum);
}
diff --git
a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
index d7b2bf825..851393789 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
@@ -116,12 +116,17 @@ public SpecificData(ClassLoader classLoader) {
@Override
public DatumReader createDatumReader(Schema schema) {
- return new SpecificDatumReader(schema, schema, this);
+ return createDatumReader( schema, schema );
}
@Override
public DatumReader createDatumReader(Schema writer, Schema reader) {
- return new SpecificDatumReader(writer, reader, this);
+ if ( isFastReaderEnabled() ) {
+ return getFastReader().createReconfigurableDatumReader(writer, reader);
+ }
+ else {
+ return new SpecificDatumReader( writer, reader, this );
+ }
}
@Override
diff --git
a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityEnumDefaults.java
b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityEnumDefaults.java
index f85f3979a..ba349eb24 100644
---
a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityEnumDefaults.java
+++
b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibilityEnumDefaults.java
@@ -131,7 +131,7 @@ private GenericRecord
serializeWithWriterThenDeserializeWithReader(Schema writer
Decoder decoder = DecoderFactory.get().resolvingDecoder(
writerSchema, readerSchema,
DecoderFactory.get().binaryDecoder(bytes, null));
- DatumReader<Object> datumReader = new GenericDatumReader<>(readerSchema);
+ DatumReader<Object> datumReader =
GenericData.get().createDatumReader(readerSchema);
return (GenericRecord)datumReader.read(null, decoder);
}
diff --git a/lang/java/ipc/src/test/java/org/apache/avro/io/Perf.java
b/lang/java/ipc/src/test/java/org/apache/avro/io/Perf.java
index 8f525b4cd..5c1ec33e1 100644
--- a/lang/java/ipc/src/test/java/org/apache/avro/io/Perf.java
+++ b/lang/java/ipc/src/test/java/org/apache/avro/io/Perf.java
@@ -18,8 +18,8 @@
package org.apache.avro.io;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
@@ -29,19 +29,17 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-
import org.apache.avro.FooBarSpecificRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.TypeEnum;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
@@ -1169,7 +1167,7 @@ protected void readInternal(Decoder d) throws IOException
{
static class GenericTest extends BasicTest {
GenericRecord[] sourceData = null;
- protected final GenericDatumReader<Object> reader;
+ protected final DatumReader<Object> reader;
public GenericTest() throws IOException {
this("Generic");
}
@@ -1180,11 +1178,11 @@ protected GenericTest(String name, String writerSchema)
throws IOException {
super(name, writerSchema, 12);
reader = newReader();
}
- protected GenericDatumReader<Object> getReader() {
+ protected DatumReader<Object> getReader() {
return reader;
}
- protected GenericDatumReader<Object> newReader() {
- return new GenericDatumReader<>(schema);
+ protected DatumReader<Object> newReader() {
+ return GenericData.get().createDatumReader(schema);
}
@Override
void genSourceData() {
@@ -1342,8 +1340,8 @@ protected GenericResolving(String name)
isWriteTest = false;
}
@Override
- protected GenericDatumReader<Object> newReader() {
- return new GenericDatumReader<>(schema, getReaderSchema());
+ protected DatumReader<Object> newReader() {
+ return GenericData.get().createDatumReader(schema, getReaderSchema());
}
protected abstract Schema getReaderSchema();
}
@@ -1395,7 +1393,7 @@ public GenericOneTimeReaderUse() throws IOException {
isWriteTest = false;
}
@Override
- protected GenericDatumReader<Object> getReader() {
+ protected DatumReader<Object> getReader() {
return newReader();
}
}
@@ -1406,7 +1404,7 @@ public GenericOneTimeUse() throws IOException {
isWriteTest = false;
}
@Override
- protected GenericDatumReader<Object> getReader() {
+ protected DatumReader<Object> getReader() {
return newReader();
}
@Override
@@ -1416,7 +1414,7 @@ protected Decoder getDecoder() {
}
static abstract class SpecificTest<T extends SpecificRecordBase> extends
BasicTest {
- protected final SpecificDatumReader<T> reader;
+ protected final DatumReader<T> reader;
protected final SpecificDatumWriter<T> writer;
private Object[] sourceData;
@@ -1425,14 +1423,14 @@ protected SpecificTest(String name, String
writerSchema) throws IOException {
reader = newReader();
writer = newWriter();
}
- protected SpecificDatumReader<T> getReader() {
+ protected DatumReader<T> getReader() {
return reader;
}
- protected SpecificDatumWriter<T> getWriter() {
+ protected DatumWriter<T> getWriter() {
return writer;
}
- protected SpecificDatumReader<T> newReader() {
- return new SpecificDatumReader<>(schema);
+ protected DatumReader<T> newReader() {
+ return SpecificData.get().createDatumReader( schema );
}
protected SpecificDatumWriter<T> newWriter() {
return new SpecificDatumWriter<>(schema);
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index eae6899de..99f66996e 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -207,6 +207,18 @@
</systemPropertyVariables>
</configuration>
</execution>
+ <execution>
+ <id>test-with-fast-readers</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <systemPropertyVariables>
+ <org.apache.avro.fastread>true</org.apache.avro.fastread>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
</executions>
<configuration>
<includes>
diff --git
a/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
b/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
index c725ca0a5..5f7098933 100644
---
a/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
+++
b/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
@@ -63,6 +63,11 @@ protected ProtobufData() {}
/** Return the singleton instance. */
public static ProtobufData get() { return INSTANCE; }
+ @Override
+ public boolean isFastReaderEnabled() {
+ return false;
+ }
+
@Override
public DatumReader createDatumReader(Schema schema) {
return new ProtobufDatumReader(schema, schema, this);
@@ -193,6 +198,7 @@ public Schema getSchema(Class c) {
private static final ThreadLocal<Map<Descriptor,Schema>> SEEN
= new ThreadLocal<Map<Descriptor,Schema>>() {
+ @Override
protected Map<Descriptor,Schema> initialValue() {
return new IdentityHashMap<>();
}
diff --git
a/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java
b/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java
index 654cd0fc3..fcad34bbb 100644
--- a/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java
+++ b/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java
@@ -59,6 +59,11 @@ protected ThriftData() {}
/** Return the singleton instance. */
public static ThriftData get() { return INSTANCE; }
+ @Override
+ public boolean isFastReaderEnabled() {
+ return false;
+ }
+
@Override
public DatumReader createDatumReader(Schema schema) {
return new ThriftDatumReader(schema, schema, this);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Improve Java reading performance with a new reader
> --------------------------------------------------
>
> Key: AVRO-2247
> URL: https://issues.apache.org/jira/browse/AVRO-2247
> Project: Apache Avro
> Issue Type: Improvement
> Components: java
> Reporter: Martin Jubelgas
> Priority: Major
> Fix For: 1.9.0
>
> Attachments: Perf-Comparison.md
>
>
> Complementary to AVRO-2090, I have been working on decoding of Avro objects
> in Java and am suggesting a new implementation of a DatumReader that improves
> read performance for both generic and specific records by approximately 20%
> (and even more in cases of nested objects with defaults, a case I encounter a
> lot in practical use).
> Key concept is to create a detailed execution plan once at DatumReader. This
> execution plan contains all required defaulting/lookup values so they need
> not be looked up during object traversal while reading.
> The reader implementation can be enabled and disabled per GenericData
> instance. The system default is set via the system variable
> "org.apache.avro.fastread" (defaults to "false").
> Attached a performance comparison of the existing implementation with the
> proposed one. Will open a pull request with respective code in a bit (not
> including interoperability with the optimizations of AVRO-2090 yet). Please
> let me know your opinion of whether this is worth pursuing further.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)