Hello, I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company. Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production). Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:
java.lang.RuntimeException: Failed to serialize row. at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?] at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?] at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry. at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more Caused by: java.io.IOException: Could not register schema in registry at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?] at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?] at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?] at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?] at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?] at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?] at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?] ... 44 more I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row: https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85 https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85 Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior? Best regards, Jannik