thanks a lot for looking into this closer!

Let's discuss the resolution of the issue in the ticket Dawid has created:


On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan <> wrote:

> Robert,
> You are correct that it is using a *CachedSchemaRegistryClient* object.
> Therefore, *schemaRegistryClient.*register() should be checking the cache
> first before sending a request to the Registry. However, turning on debug
> logging of my Registry, I can see a request being sent for every serialized
> message. Therefore, this meant the cache in *schemaRegistryClient.*register()
> was empty.
> By adding some more debug logging, I think I found the issue within
> *RegistryAvroSerializationSchema.serialize():*
> public byte[] serialize(T object) {
>   checkAvroInitialized();
>   if (object == null) {
>     return null;
>   } else {
>     try {
>       Encoder encoder = getEncoder();
> *schemaCoderProvider.get()        .writeSchema(getSchema(),
> getOutputStream());   // get()  *Creates a new instance of {@link
> SchemaCoder}[1]
>       getDatumWriter().write(object, encoder);
>       encoder.flush();
>       byte[] bytes = getOutputStream().toByteArray();
>       getOutputStream().reset();
>       return bytes;
>     } catch (IOException e) {
>       throw new WrappingRuntimeException("Failed to serialize schema
> registry.", e);
>     }
>   }
> }
> This *schemaCoderProvider*.get() call is creating a new instance of
> *SchemaCoder* every time, instead of using the one that was instantiated
> inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This
> means, we get an object with a new cache every time (i.e. its empty and
> *schemaRegistryClient.*register() falls back to an HTTP request to the
> Registry).
> Simply changing the above line to:
> schemaCoder.writeSchema(getSchema(), getOutputStream());
> solved the issue. Since 
> *RegistryAvroSerializationSchema.*checkAvroInitialized()
> is called first inside *RegistryAvroSerializationSchema*.serializer(), we
> do not have to worry about the *schemaCoder* object being null. Happy to
> open a PR for the ticket created if this makes sense.
> [1]
> On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <>
> wrote:
>> Hi Steve,
>> I think your observation is correct. If I am not mistaken we should use 
>> *schemaRegistryClient.getId(subject,
>> schema); *instead of
>> *schemaRegistryClient.register(subject, schema);. *The former should
>> perform an http request only if the schema is not in the cache.
>> I created an issue to track it
>> Would you maybe like to check it and prepare a fix for it ;) ?
>> Best,
>> Dawid
>> On 06/02/2020 16:11, Robert Metzger wrote:
>> Hi,
>> thanks a lot for your message. It's certainly not intentional to do a
>> HTTP request for every single message :)
>> Isn't the *schemaRegistryClient *an instance of
>> CachedSchemaRegistryClient, which, as the name says, caches?
>> Can you check with a debugger at runtime what registry client is used,
>> and if there are indeed no cache hits?
>> Alternatively, you could check the log of the schema registry service.
>> Best,
>> Robert
>> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <> wrote:
>>> Hi,
>>> I'm running Flink v1.9. I backported the commit adding serialization
>>> support for Confluent's schema registry[1]. Using the code as is, I saw a
>>> nearly 50% drop in peak throughput for my job compared to using
>>> *AvroRowSerializationSchema*.
>>> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>>>  executes:
>>> public byte[] serialize(T object) {
>>>   checkAvroInitialized();
>>>   if (object == null) {
>>>     return null;
>>>   } else {
>>>     try {
>>>       Encoder encoder = getEncoder();
>>> *schemaCoderProvider.get()         .writeSchema(getSchema(),
>>> getOutputStream());*
>>>       getDatumWriter().write(object, encoder);
>>>       encoder.flush();
>>>       byte[] bytes = getOutputStream().toByteArray();
>>>       getOutputStream().reset();
>>>       return bytes;
>>>     } catch (IOException e) {
>>>       throw new WrappingRuntimeException("Failed to serialize schema
>>> registry.", e);
>>>     }
>>>   }
>>> }
>>> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* 
>>> attempts
>>> to register the schema.
>>> public void writeSchema(Schema schema, OutputStream out) throws
>>> IOException {
>>>   try {
>>> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>>>     out.write(CONFLUENT_MAGIC_BYTE);
>>>     byte[] schemaIdBytes =
>>> ByteBuffer.allocate(4).putInt(registeredId).array();
>>>     out.write(schemaIdBytes);
>>>   } catch (RestClientException e) {
>>>     throw new IOException("Could not register schema in registry", e);
>>>   }
>>> }
>>> It's making an HTTP request to the Schema Registry for every single
>>> message. Since the output schema does not change over the course of a
>>> streaming job, it seems you should only need to register the schema once.
>>> I moved the schema registration call into
>>> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
>>> helper function to add the magic byte and schema id bytes to be called from
>>> *RegistryAvroSerializationSchema.serialize()*. After this change, the
>>> jobs performance returned to comparable levels to using
>>> *AvroRowSerializationSchema.*
>>> Am I right in thinking this was perhaps a design flaw and not
>>> intentionally done?
>>> [1]

Reply via email to