Hi Jannik, By default, Kafka client applications automatically register new schemas [1]. You should be able to influence that by using properties, e.g. setting:
'properties.auto.register.schemas' = 'false' 'properties.use.latest.version' = 'true' Best regards, Martijn [1] https://docs.confluent.io/platform/current/schema-registry/security/index.html#disabling-auto-schema-registration On Wed, May 31, 2023 at 1:35 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > > > Hello Jannik, > > > > Some things to consider (I had a similar problem a couple of years before): > > - The schemaRegistryClient actually caches schema ids, so it will hit > the schema registry only once, > - The schema registered in schema registry needs to be byte-equal, > otherwise schema registry considers it to be a new schema (version) > - … to my best knowledge writing an existing schema to the schema > registry does not fail because it is actually not written > - Could be that this is not entirely true as we had to replace the > whole schemaRegistryClient with our own implementation because the > existing > one could not be reconfigured to accept compressed answers from our r/o > proxy > - if you manage to fill the cache of your schemaRegistryClient with > the exact schema (e.g. by querying it beforehand) you might never run into > the trouble > > > > Hope this helps … keep us posted 😊 > > > > Thias > > > > > > > > > > *From:* Schmeier, Jannik <j.schme...@fraport.de> > *Sent:* Wednesday, May 31, 2023 12:44 PM > *To:* user@flink.apache.org > *Subject:* Using pre-registered schemas with avro-confluent-registry > format is not possible > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hello, > > > > I'm trying to use the avro-confluent-registry format with the Confluent > Cloud Schema Registry in our company. > > Our schemas are managed via Terraform and global write access is denied > for all Kafka clients in our environments (or at least in production). > > Therefore, when using the avro-confluent-registry format I'm getting an > error when Flink is trying to serialize a row: > > java.lang.RuntimeException: Failed to serialize row. > > at > org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) > ~[?:?] > > at > org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) > ~[?:?] > > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) > ~[?:?] > > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) > ~[?:?] > > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) > ~[?:?] > > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) > ~[flink-table-runtime-1.17.0.jar:1.17.0] > > at > org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) > ~[flink-table-runtime-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.17.0.jar:1.17.0] > > at StreamExecCalc$2221.processElement_0_0(Unknown Source) > ~[?:?] > > at > StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown > Source) ~[?:?] > > at > StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?] > > at StreamExecCalc$2221.processElement_split308(Unknown > Source) ~[?:?] > > at StreamExecCalc$2221.processElement(Unknown Source) > ~[?:?] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) > ~[flink-table-runtime-1.17.0.jar:1.17.0] > > at > org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) > ~[flink-table-runtime-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > ~[flink-dist-1.17.0.jar:1.17.0] > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > ~[flink-dist-1.17.0.jar:1.17.0] > > at java.lang.Thread.run(Unknown Source) ~[?:?] > > Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to > serialize schema registry. > > at > org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) > ~[?:?] > > at > org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) > ~[?:?] > > ... 44 more > > Caused by: java.io.IOException: Could not register schema in registry > > at > org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) > ~[?:?] > > at > org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) > ~[?:?] > > at > org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) > ~[?:?] > > ... 44 more > > Caused by: > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > User is denied operation Write on Subject: my-topic-key; error code: 40301 > > at > io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) > ~[?:?] > > at > io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) > ~[?:?] > > at > org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) > ~[?:?] > > at > org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) > ~[?:?] > > at > org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) > ~[?:?] > > ... 44 more > > > I've inspected the code of the avro-confluent-registry format and it seems > like there is now way to disable this behavior. The format will always try > to register a schema when serializing a row: > > > https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85 > > https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85 > > Is there a particular reason for this or would you be interested in adding > a configuration option to disable this behavior? > > > > Best regards, > > Jannik > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >