Hi Steve, I think your observation is correct. If I am not mistaken we should use *schemaRegistryClient.getId(subject, schema); *instead of** **schemaRegistryClient.register(subject, schema);. **The former should perform an http request only if the schema is not in the cache.
I created an issue to track it https://issues.apache.org/jira/browse/FLINK-15941 Would you maybe like to check it and prepare a fix for it ;) ? Best, Dawid **** On 06/02/2020 16:11, Robert Metzger wrote: > Hi, > thanks a lot for your message. It's certainly not intentional to do a > HTTP request for every single message :) > > Isn't the *schemaRegistryClient *an instance > of CachedSchemaRegistryClient, which, as the name says, caches? > Can you check with a debugger at runtime what registry client is used, > and if there are indeed no cache hits? > Alternatively, you could check the log of the schema registry service. > > Best, > Robert > > > On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com > <mailto:swhe...@jwplayer.com>> wrote: > > Hi, > > I'm running Flink v1.9. I backported the commit adding > serialization support for Confluent's schema registry[1]. Using > the code as is, I saw a nearly 50% drop in peak throughput for my > job compared to using /AvroRowSerializationSchema/. > > Looking at the > code, /RegistryAvroSerializationSchema.serialize()/ executes: > > > public byte[] serialize(T object) { > checkAvroInitialized(); > > if (object == null) { > return null; > } else { > try { > Encoder encoder = getEncoder(); > *schemaCoderProvider.get() > .writeSchema(getSchema(), getOutputStream());* > getDatumWriter().write(object, encoder); > encoder.flush(); > byte[] bytes = getOutputStream().toByteArray(); > getOutputStream().reset(); > return bytes; > } catch (IOException e) { > throw new WrappingRuntimeException("Failed to serialize > schema registry.", e); > } > } > } > > > For every single > message. /ConfluentSchemaRegistryCoder.writeSchema()/ attempts to > register the schema. > > > public void writeSchema(Schema schema, OutputStream out) throws > IOException { > try { > * int registeredId = schemaRegistryClient.register(subject, > schema);* > out.write(CONFLUENT_MAGIC_BYTE); > byte[] schemaIdBytes = > ByteBuffer.allocate(4).putInt(registeredId).array(); > out.write(schemaIdBytes); > } catch (RestClientException e) { > throw new IOException("Could not register schema in registry", e); > } > } > > > It's making an HTTP request to the Schema Registry for every > single message. Since the output schema does not change over the > course of a streaming job, it seems you should only need to > register the schema once. > > I moved the schema registration call > into /RegistryAvroSerializationSchema.checkAvroInitialized()/ and > added a helper function to add the magic byte and schema id bytes > to be called from /RegistryAvroSerializationSchema.serialize()/. > After this change, the jobs performance returned to comparable > levels to using /AvroRowSerializationSchema./ > > Am I right in thinking this was perhaps a design flaw and not > intentionally done? > > > [1] > https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339 >
signature.asc
Description: OpenPGP digital signature