
I'm running Flink v1.9. I backported the commit adding serialization
support for Confluent's schema registry[1]. Using the code as is, I saw a
nearly 50% drop in peak throughput for my job compared to using

Looking at the code, *RegistryAvroSerializationSchema.serialize()* executes:

public byte[] serialize(T object) {

  if (object == null) {
    return null;
  } else {
    try {
      Encoder encoder = getEncoder();

*schemaCoderProvider.get()        .writeSchema(getSchema(),
      getDatumWriter().write(object, encoder);
      byte[] bytes = getOutputStream().toByteArray();
      return bytes;
    } catch (IOException e) {
      throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);

For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
to register the schema.

public void writeSchema(Schema schema, OutputStream out) throws IOException
  try {
*    int registeredId = schemaRegistryClient.register(subject, schema);*
    byte[] schemaIdBytes =
  } catch (RestClientException e) {
    throw new IOException("Could not register schema in registry", e);

It's making an HTTP request to the Schema Registry for every single
message. Since the output schema does not change over the course of a
streaming job, it seems you should only need to register the schema once.

I moved the schema registration call into
*RegistryAvroSerializationSchema.checkAvroInitialized()* and added a helper
function to add the magic byte and schema id bytes to be called from
*RegistryAvroSerializationSchema.serialize()*. After this change, the jobs
performance returned to comparable levels to using

Am I right in thinking this was perhaps a design flaw and not intentionally


Reply via email to