Hi,

I'm running Flink v1.9. I backported the commit adding serialization
support for Confluent's schema registry[1]. Using the code as is, I saw a
nearly 50% drop in peak throughput for my job compared to using
*AvroRowSerializationSchema*.

Looking at the code, *RegistryAvroSerializationSchema.serialize()* executes:


public byte[] serialize(T object) {
  checkAvroInitialized();

  if (object == null) {
    return null;
  } else {
    try {
      Encoder encoder = getEncoder();

*schemaCoderProvider.get()        .writeSchema(getSchema(),
getOutputStream());*
      getDatumWriter().write(object, encoder);
      encoder.flush();
      byte[] bytes = getOutputStream().toByteArray();
      getOutputStream().reset();
      return bytes;
    } catch (IOException e) {
      throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
    }
  }
}


For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
to register the schema.


public void writeSchema(Schema schema, OutputStream out) throws IOException
{
  try {
*    int registeredId = schemaRegistryClient.register(subject, schema);*
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes =
ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
  } catch (RestClientException e) {
    throw new IOException("Could not register schema in registry", e);
  }
}


It's making an HTTP request to the Schema Registry for every single
message. Since the output schema does not change over the course of a
streaming job, it seems you should only need to register the schema once.

I moved the schema registration call into
*RegistryAvroSerializationSchema.checkAvroInitialized()* and added a helper
function to add the magic byte and schema id bytes to be called from
*RegistryAvroSerializationSchema.serialize()*. After this change, the jobs
performance returned to comparable levels to using
*AvroRowSerializationSchema.*

Am I right in thinking this was perhaps a design flaw and not intentionally
done?


[1]
https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339

Reply via email to