Hello Jannik, Some things to consider (I had a similar problem a couple of years before):
* The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once, * The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version) * … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written * Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy * if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble Hope this helps … keep us posted 😊 Thias From: Schmeier, Jannik <j.schme...@fraport.de> Sent: Wednesday, May 31, 2023 12:44 PM To: user@flink.apache.org Subject: Using pre-registered schemas with avro-confluent-registry format is not possible ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hello, I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company. Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production). Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row: java.lang.RuntimeException: Failed to serialize row. at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?] at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry. at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more Caused by: java.io.IOException: Could not register schema in registry at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?] at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?] at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?] at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?] at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row: https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85 https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85 Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior? Best regards, Jannik Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.