Robert, You are correct that it is using a *CachedSchemaRegistryClient* object. Therefore, *schemaRegistryClient.*register() should be checking the cache first before sending a request to the Registry. However, turning on debug logging of my Registry, I can see a request being sent for every serialized message. Therefore, this meant the cache in *schemaRegistryClient.*register() was empty.
By adding some more debug logging, I think I found the issue within *RegistryAvroSerializationSchema.serialize():* public byte[] serialize(T object) { checkAvroInitialized(); if (object == null) { return null; } else { try { Encoder encoder = getEncoder(); *schemaCoderProvider.get() .writeSchema(getSchema(), getOutputStream()); // get() *Creates a new instance of {@link SchemaCoder}[1] getDatumWriter().write(object, encoder); encoder.flush(); byte[] bytes = getOutputStream().toByteArray(); getOutputStream().reset(); return bytes; } catch (IOException e) { throw new WrappingRuntimeException("Failed to serialize schema registry.", e); } } } This *schemaCoderProvider*.get() call is creating a new instance of *SchemaCoder* every time, instead of using the one that was instantiated inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This means, we get an object with a new cache every time (i.e. its empty and *schemaRegistryClient.*register() falls back to an HTTP request to the Registry). Simply changing the above line to: schemaCoder.writeSchema(getSchema(), getOutputStream()); solved the issue. Since *RegistryAvroSerializationSchema.*checkAvroInitialized() is called first inside *RegistryAvroSerializationSchema*.serializer(), we do not have to worry about the *schemaCoder* object being null. Happy to open a PR for the ticket created if this makes sense. [1] https://github.com/apache/flink/blob/37a818ce8714adf14153587bf99c0900e5af42b7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Steve, > > I think your observation is correct. If I am not mistaken we should use > *schemaRegistryClient.getId(subject, > schema); *instead of > *schemaRegistryClient.register(subject, schema);. *The former should > perform an http request only if the schema is not in the cache. > > I created an issue to track it > https://issues.apache.org/jira/browse/FLINK-15941 > > Would you maybe like to check it and prepare a fix for it ;) ? > > Best, > > Dawid > > > On 06/02/2020 16:11, Robert Metzger wrote: > > Hi, > thanks a lot for your message. It's certainly not intentional to do a HTTP > request for every single message :) > > Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient, > which, as the name says, caches? > Can you check with a debugger at runtime what registry client is used, and > if there are indeed no cache hits? > Alternatively, you could check the log of the schema registry service. > > Best, > Robert > > > On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com> wrote: > >> Hi, >> >> I'm running Flink v1.9. I backported the commit adding serialization >> support for Confluent's schema registry[1]. Using the code as is, I saw a >> nearly 50% drop in peak throughput for my job compared to using >> *AvroRowSerializationSchema*. >> >> Looking at the code, *RegistryAvroSerializationSchema.serialize()* >> executes: >> >> >> public byte[] serialize(T object) { >> checkAvroInitialized(); >> >> if (object == null) { >> return null; >> } else { >> try { >> Encoder encoder = getEncoder(); >> >> *schemaCoderProvider.get() .writeSchema(getSchema(), >> getOutputStream());* >> getDatumWriter().write(object, encoder); >> encoder.flush(); >> byte[] bytes = getOutputStream().toByteArray(); >> getOutputStream().reset(); >> return bytes; >> } catch (IOException e) { >> throw new WrappingRuntimeException("Failed to serialize schema >> registry.", e); >> } >> } >> } >> >> >> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* >> attempts >> to register the schema. >> >> >> public void writeSchema(Schema schema, OutputStream out) throws >> IOException { >> try { >> * int registeredId = schemaRegistryClient.register(subject, schema);* >> out.write(CONFLUENT_MAGIC_BYTE); >> byte[] schemaIdBytes = >> ByteBuffer.allocate(4).putInt(registeredId).array(); >> out.write(schemaIdBytes); >> } catch (RestClientException e) { >> throw new IOException("Could not register schema in registry", e); >> } >> } >> >> >> It's making an HTTP request to the Schema Registry for every single >> message. Since the output schema does not change over the course of a >> streaming job, it seems you should only need to register the schema once. >> >> I moved the schema registration call into >> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a >> helper function to add the magic byte and schema id bytes to be called from >> *RegistryAvroSerializationSchema.serialize()*. After this change, the >> jobs performance returned to comparable levels to using >> *AvroRowSerializationSchema.* >> >> Am I right in thinking this was perhaps a design flaw and not >> intentionally done? >> >> >> [1] >> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339 >> >