Robert,

You are correct that it is using a *CachedSchemaRegistryClient* object.
Therefore, *schemaRegistryClient.*register() should be checking the cache
first before sending a request to the Registry. However, turning on debug
logging of my Registry, I can see a request being sent for every serialized
message. Therefore, this meant the cache in *schemaRegistryClient.*register()
was empty.

By adding some more debug logging, I think I found the issue within
*RegistryAvroSerializationSchema.serialize():*

public byte[] serialize(T object) {
  checkAvroInitialized();

  if (object == null) {
    return null;
  } else {
    try {
      Encoder encoder = getEncoder();

*schemaCoderProvider.get()        .writeSchema(getSchema(),
getOutputStream());   // get()  *Creates a new instance of {@link
SchemaCoder}[1]
      getDatumWriter().write(object, encoder);
      encoder.flush();
      byte[] bytes = getOutputStream().toByteArray();
      getOutputStream().reset();
      return bytes;
    } catch (IOException e) {
      throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
    }
  }
}


This *schemaCoderProvider*.get() call is creating a new instance of
*SchemaCoder* every time, instead of using the one that was instantiated
inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This
means, we get an object with a new cache every time (i.e. its empty and
*schemaRegistryClient.*register() falls back to an HTTP request to the
Registry).

Simply changing the above line to:

schemaCoder.writeSchema(getSchema(), getOutputStream());


solved the issue. Since
*RegistryAvroSerializationSchema.*checkAvroInitialized()
is called first inside *RegistryAvroSerializationSchema*.serializer(), we
do not have to worry about the *schemaCoder* object being null. Happy to
open a PR for the ticket created if this makes sense.


[1]
https://github.com/apache/flink/blob/37a818ce8714adf14153587bf99c0900e5af42b7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java


On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Steve,
>
> I think your observation is correct. If I am not mistaken we should use 
> *schemaRegistryClient.getId(subject,
> schema); *instead of
> *schemaRegistryClient.register(subject, schema);. *The former should
> perform an http request only if the schema is not in the cache.
>
> I created an issue to track it
> https://issues.apache.org/jira/browse/FLINK-15941
>
> Would you maybe like to check it and prepare a fix for it ;) ?
>
> Best,
>
> Dawid
>
>
> On 06/02/2020 16:11, Robert Metzger wrote:
>
> Hi,
> thanks a lot for your message. It's certainly not intentional to do a HTTP
> request for every single message :)
>
> Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient,
> which, as the name says, caches?
> Can you check with a debugger at runtime what registry client is used, and
> if there are indeed no cache hits?
> Alternatively, you could check the log of the schema registry service.
>
> Best,
> Robert
>
>
> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com> wrote:
>
>> Hi,
>>
>> I'm running Flink v1.9. I backported the commit adding serialization
>> support for Confluent's schema registry[1]. Using the code as is, I saw a
>> nearly 50% drop in peak throughput for my job compared to using
>> *AvroRowSerializationSchema*.
>>
>> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>>  executes:
>>
>>
>> public byte[] serialize(T object) {
>>   checkAvroInitialized();
>>
>>   if (object == null) {
>>     return null;
>>   } else {
>>     try {
>>       Encoder encoder = getEncoder();
>>
>> *schemaCoderProvider.get()         .writeSchema(getSchema(),
>> getOutputStream());*
>>       getDatumWriter().write(object, encoder);
>>       encoder.flush();
>>       byte[] bytes = getOutputStream().toByteArray();
>>       getOutputStream().reset();
>>       return bytes;
>>     } catch (IOException e) {
>>       throw new WrappingRuntimeException("Failed to serialize schema
>> registry.", e);
>>     }
>>   }
>> }
>>
>>
>> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* 
>> attempts
>> to register the schema.
>>
>>
>> public void writeSchema(Schema schema, OutputStream out) throws
>> IOException {
>>   try {
>> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>>     out.write(CONFLUENT_MAGIC_BYTE);
>>     byte[] schemaIdBytes =
>> ByteBuffer.allocate(4).putInt(registeredId).array();
>>     out.write(schemaIdBytes);
>>   } catch (RestClientException e) {
>>     throw new IOException("Could not register schema in registry", e);
>>   }
>> }
>>
>>
>> It's making an HTTP request to the Schema Registry for every single
>> message. Since the output schema does not change over the course of a
>> streaming job, it seems you should only need to register the schema once.
>>
>> I moved the schema registration call into
>> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
>> helper function to add the magic byte and schema id bytes to be called from
>> *RegistryAvroSerializationSchema.serialize()*. After this change, the
>> jobs performance returned to comparable levels to using
>> *AvroRowSerializationSchema.*
>>
>> Am I right in thinking this was perhaps a design flaw and not
>> intentionally done?
>>
>>
>> [1]
>> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>>
>

Reply via email to