Hi,

We are trying to encode a list of records into JSON with one schema and
then decode the list into Avro objects with a compatible schema. The schema
resolution between the two schemas works for single records, but the
deserialization fails when the read schema differs from the write.
Deserialization works, however, when the same schema is used for both.

When decoding, an exception is thrown:

org.apache.avro.AvroTypeException: Attempt to process a item-end when a int
was expected.
       org.apache.avro.io.parsing.Parser.advance(Parser.java:93)
       org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
       org.apache.avro.io.JsonDecoder.arrayNext(JsonDecoder.java:360)

It seems like the decoder is not moving the proper number of bytes down to
read the next element.

We encode like so:

public static <T extends GenericRecord> String
toJSONArrayString(List<T> avroRecords, Schema schema) throws
IOException {

  if (avroRecords == null || avroRecords.isEmpty()) {
    return "[]";
  }

  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  Encoder encoder =
ENCODER_FACTORY.jsonEncoder(Schema.createArray(schema), baos);
  DatumWriter<T> datumWriter = avroRecords.get(0) instanceof SpecificRecord
      ? new SpecificDatumWriter<>(schema)
      : new GenericDatumWriter<>(schema);

  encoder.writeArrayStart();
  encoder.setItemCount(avroRecords.size());
  for (T record : avroRecords) {
    encoder.startItem();
    datumWriter.write(record, encoder);
  }
  encoder.writeArrayEnd();
  encoder.flush();

  return baos.toString();
}


And decode similarly:

public static <T extends GenericRecord> List<T>
fromJSONArrayString(String jsonArrayString, Schema writeSchema, Schema
readSchema) throws IOException {
  Schema readArrSchema = Schema.createArray(readSchema);
  Decoder decoder = DECODER_FACTORY.jsonDecoder(readArrSchema, jsonArrayString);
  DatumReader<T> datumReader;
  if (writeSchema.equals(readSchema)) {
    datumReader = new SpecificDatumReader<>(readSchema);
  } else {
    datumReader = new SpecificDatumReader<>(writeSchema, readSchema);
  }

  List<T> avroRecords = new ArrayList<>();
  for (long i = decoder.readArrayStart(); i != 0; i = decoder.arrayNext()) {
    for (long j = 0; j < i; j++) {
      avroRecords.add(datumReader.read(null, decoder));
    }
  }

  return avroRecords;
}



Our two schemas look like:

{
  "type": "record",
  "name": "TestRecordV1",
  "fields": [
    {
      "name": "text",
      "type": "string"
    }
  ]
}

{
  "type": "record",
  "name": "TestRecordV2",
  "fields": [
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "number",
      "type": "int",
      "default": 0
    }
  ]
}



Is there something simple we are missing or is it not possible to do schema
resolution dynamically on an entire array?

Thank you!
Austin

Reply via email to