Hi,
thanks a lot for your message. It's certainly not intentional to do a HTTP
request for every single message :)

Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient,
which, as the name says, caches?
Can you check with a debugger at runtime what registry client is used, and
if there are indeed no cache hits?
Alternatively, you could check the log of the schema registry service.

Best,
Robert


On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com> wrote:

> Hi,
>
> I'm running Flink v1.9. I backported the commit adding serialization
> support for Confluent's schema registry[1]. Using the code as is, I saw a
> nearly 50% drop in peak throughput for my job compared to using
> *AvroRowSerializationSchema*.
>
> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>  executes:
>
>
> public byte[] serialize(T object) {
>   checkAvroInitialized();
>
>   if (object == null) {
>     return null;
>   } else {
>     try {
>       Encoder encoder = getEncoder();
>
> *schemaCoderProvider.get()        .writeSchema(getSchema(),
> getOutputStream());*
>       getDatumWriter().write(object, encoder);
>       encoder.flush();
>       byte[] bytes = getOutputStream().toByteArray();
>       getOutputStream().reset();
>       return bytes;
>     } catch (IOException e) {
>       throw new WrappingRuntimeException("Failed to serialize schema
> registry.", e);
>     }
>   }
> }
>
>
> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* 
> attempts
> to register the schema.
>
>
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
>   try {
> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>     out.write(CONFLUENT_MAGIC_BYTE);
>     byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
>     out.write(schemaIdBytes);
>   } catch (RestClientException e) {
>     throw new IOException("Could not register schema in registry", e);
>   }
> }
>
>
> It's making an HTTP request to the Schema Registry for every single
> message. Since the output schema does not change over the course of a
> streaming job, it seems you should only need to register the schema once.
>
> I moved the schema registration call into
> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
> helper function to add the magic byte and schema id bytes to be called from
> *RegistryAvroSerializationSchema.serialize()*. After this change, the
> jobs performance returned to comparable levels to using
> *AvroRowSerializationSchema.*
>
> Am I right in thinking this was perhaps a design flaw and not
> intentionally done?
>
>
> [1]
> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>

Reply via email to