Hi Steve,

I think your observation is correct. If I am not mistaken we should use
*schemaRegistryClient.getId(subject, schema); *instead of**
**schemaRegistryClient.register(subject, schema);. **The former should
perform an http request only if the schema is not in the cache.

I created an issue to track it
https://issues.apache.org/jira/browse/FLINK-15941

Would you maybe like to check it and prepare a fix for it ;) ?

Best,

Dawid


****

On 06/02/2020 16:11, Robert Metzger wrote:
> Hi,
> thanks a lot for your message. It's certainly not intentional to do a
> HTTP request for every single message :)
>
> Isn't the *schemaRegistryClient *an instance
> of CachedSchemaRegistryClient, which, as the name says, caches?
> Can you check with a debugger at runtime what registry client is used,
> and if there are indeed no cache hits?
> Alternatively, you could check the log of the schema registry service.
>
> Best,
> Robert 
>
>
> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhe...@jwplayer.com
> <mailto:swhe...@jwplayer.com>> wrote:
>
>     Hi,
>
>     I'm running Flink v1.9. I backported the commit adding
>     serialization support for Confluent's schema registry[1]. Using
>     the code as is, I saw a nearly 50% drop in peak throughput for my
>     job compared to using /AvroRowSerializationSchema/.
>
>     Looking at the
>     code, /RegistryAvroSerializationSchema.serialize()/ executes:
>
>
>     public byte[] serialize(T object) {
>       checkAvroInitialized();
>
>       if (object == null) {
>         return null;
>       } else {
>         try {
>           Encoder encoder = getEncoder();
>           *schemaCoderProvider.get()
>             .writeSchema(getSchema(), getOutputStream());*
>           getDatumWriter().write(object, encoder); 
>           encoder.flush();
>           byte[] bytes = getOutputStream().toByteArray();
>           getOutputStream().reset();
>           return bytes;
>         } catch (IOException e) {
>           throw new WrappingRuntimeException("Failed to serialize
>     schema registry.", e);
>         }
>       }
>     }
>
>
>     For every single
>     message. /ConfluentSchemaRegistryCoder.writeSchema()/ attempts to
>     register the schema.
>
>
>     public void writeSchema(Schema schema, OutputStream out) throws
>     IOException {
>       try {
>     *    int registeredId = schemaRegistryClient.register(subject,
>     schema);*
>         out.write(CONFLUENT_MAGIC_BYTE);
>         byte[] schemaIdBytes =
>     ByteBuffer.allocate(4).putInt(registeredId).array();
>         out.write(schemaIdBytes);
>       } catch (RestClientException e) {
>         throw new IOException("Could not register schema in registry", e);
>       }
>     }
>
>
>     It's making an HTTP request to the Schema Registry for every
>     single message. Since the output schema does not change over the
>     course of a streaming job, it seems you should only need to
>     register the schema once. 
>
>     I moved the schema registration call
>     into /RegistryAvroSerializationSchema.checkAvroInitialized()/ and
>     added a helper function to add the magic byte and schema id bytes
>     to be called from /RegistryAvroSerializationSchema.serialize()/.
>     After this change, the jobs performance returned to comparable
>     levels to using /AvroRowSerializationSchema./
>
>     Am I right in thinking this was perhaps a design flaw and not
>     intentionally done?
>
>
>     [1] 
> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to