Steve, thanks a lot for looking into this closer! Let's discuss the resolution of the issue in the ticket Dawid has created: https://issues.apache.org/jira/browse/FLINK-15941
Best, Robert On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan <swhe...@jwplayer.com> wrote: > Robert, > > You are correct that it is using a *CachedSchemaRegistryClient* object. > Therefore, *schemaRegistryClient.*register() should be checking the cache > first before sending a request to the Registry. However, turning on debug > logging of my Registry, I can see a request being sent for every serialized > message. Therefore, this meant the cache in *schemaRegistryClient.*register() > was empty. > > By adding some more debug logging, I think I found the issue within > *RegistryAvroSerializationSchema.serialize():* > > public byte[] serialize(T object) { > checkAvroInitialized(); > > if (object == null) { > return null; > } else { > try { > Encoder encoder = getEncoder(); > > *schemaCoderProvider.get() .writeSchema(getSchema(), > getOutputStream()); // get() *Creates a new instance of {@link > SchemaCoder}[1] > getDatumWriter().write(object, encoder); > encoder.flush(); > byte[] bytes = getOutputStream().toByteArray(); > getOutputStream().reset(); > return bytes; > } catch (IOException e) { > throw new WrappingRuntimeException("Failed to serialize schema > registry.", e); > } > } > } > > > This *schemaCoderProvider*.get() call is creating a new instance of > *SchemaCoder* every time, instead of using the one that was instantiated > inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This > means, we get an object with a new cache every time (i.e. its empty and > *schemaRegistryClient.*register() falls back to an HTTP request to the > Registry). > > Simply changing the above line to: > > schemaCoder.writeSchema(getSchema(), getOutputStream()); > > > solved the issue. Since > *RegistryAvroSerializationSchema.*checkAvroInitialized() > is called first inside *RegistryAvroSerializationSchema*.serializer(), we > do not have to worry about the *schemaCoder* object being null. Happy to > open a PR for the ticket created if this makes sense. > > > [1] > https://github.com/apache/flink/blob/37a818ce8714adf14153587bf99c0900e5af42b7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java > > > On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Steve, >> >> I think your observation is correct. If I am not mistaken we should use >> *schemaRegistryClient.getId(subject, >> schema); *instead of >> *schemaRegistryClient.register(subject, schema);. *The former should >> perform an http request only if the schema is not in the cache. >> >> I created an issue to track it >> https://issues.apache.org/jira/browse/FLINK-15941 >> >> Would you maybe like to check it and prepare a fix for it ;) ? >> >> Best, >> >> Dawid >> >> >> On 06/02/2020 16:11, Robert Metzger wrote: >> >> Hi, >> thanks a lot for your message. It's certainly not intentional to do a >> HTTP request for every single message :) >> >> Isn't the *schemaRegistryClient *an instance of >> CachedSchemaRegistryClient, which, as the name says, caches? >> Can you check with a debugger at runtime what registry client is used, >> and if there are indeed no cache hits? >> Alternatively, you could check the log of the schema registry service. >> >> Best, >> Robert >> >> >> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com> wrote: >> >>> Hi, >>> >>> I'm running Flink v1.9. I backported the commit adding serialization >>> support for Confluent's schema registry[1]. Using the code as is, I saw a >>> nearly 50% drop in peak throughput for my job compared to using >>> *AvroRowSerializationSchema*. >>> >>> Looking at the code, *RegistryAvroSerializationSchema.serialize()* >>> executes: >>> >>> >>> public byte[] serialize(T object) { >>> checkAvroInitialized(); >>> >>> if (object == null) { >>> return null; >>> } else { >>> try { >>> Encoder encoder = getEncoder(); >>> >>> *schemaCoderProvider.get() .writeSchema(getSchema(), >>> getOutputStream());* >>> getDatumWriter().write(object, encoder); >>> encoder.flush(); >>> byte[] bytes = getOutputStream().toByteArray(); >>> getOutputStream().reset(); >>> return bytes; >>> } catch (IOException e) { >>> throw new WrappingRuntimeException("Failed to serialize schema >>> registry.", e); >>> } >>> } >>> } >>> >>> >>> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* >>> attempts >>> to register the schema. >>> >>> >>> public void writeSchema(Schema schema, OutputStream out) throws >>> IOException { >>> try { >>> * int registeredId = schemaRegistryClient.register(subject, schema);* >>> out.write(CONFLUENT_MAGIC_BYTE); >>> byte[] schemaIdBytes = >>> ByteBuffer.allocate(4).putInt(registeredId).array(); >>> out.write(schemaIdBytes); >>> } catch (RestClientException e) { >>> throw new IOException("Could not register schema in registry", e); >>> } >>> } >>> >>> >>> It's making an HTTP request to the Schema Registry for every single >>> message. Since the output schema does not change over the course of a >>> streaming job, it seems you should only need to register the schema once. >>> >>> I moved the schema registration call into >>> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a >>> helper function to add the magic byte and schema id bytes to be called from >>> *RegistryAvroSerializationSchema.serialize()*. After this change, the >>> jobs performance returned to comparable levels to using >>> *AvroRowSerializationSchema.* >>> >>> Am I right in thinking this was perhaps a design flaw and not >>> intentionally done? >>> >>> >>> [1] >>> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339 >>> >>