Hi, I'm running Flink v1.9. I backported the commit adding serialization support for Confluent's schema registry[1]. Using the code as is, I saw a nearly 50% drop in peak throughput for my job compared to using *AvroRowSerializationSchema*.
Looking at the code, *RegistryAvroSerializationSchema.serialize()* executes: public byte[] serialize(T object) { checkAvroInitialized(); if (object == null) { return null; } else { try { Encoder encoder = getEncoder(); *schemaCoderProvider.get() .writeSchema(getSchema(), getOutputStream());* getDatumWriter().write(object, encoder); encoder.flush(); byte[] bytes = getOutputStream().toByteArray(); getOutputStream().reset(); return bytes; } catch (IOException e) { throw new WrappingRuntimeException("Failed to serialize schema registry.", e); } } } For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts to register the schema. public void writeSchema(Schema schema, OutputStream out) throws IOException { try { * int registeredId = schemaRegistryClient.register(subject, schema);* out.write(CONFLUENT_MAGIC_BYTE); byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); out.write(schemaIdBytes); } catch (RestClientException e) { throw new IOException("Could not register schema in registry", e); } } It's making an HTTP request to the Schema Registry for every single message. Since the output schema does not change over the course of a streaming job, it seems you should only need to register the schema once. I moved the schema registration call into *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a helper function to add the magic byte and schema id bytes to be called from *RegistryAvroSerializationSchema.serialize()*. After this change, the jobs performance returned to comparable levels to using *AvroRowSerializationSchema.* Am I right in thinking this was perhaps a design flaw and not intentionally done? [1] https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339