[ https://issues.apache.org/jira/browse/FLINK-22763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-22763: ---------------------------- Component/s: (was: Table SQL / API) Table SQL / Ecosystem > avro-confluent format does not allow for authorization credentials to be > supplied to Confluent Schema Registry > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-22763 > URL: https://issues.apache.org/jira/browse/FLINK-22763 > Project: Flink > Issue Type: Improvement > Components: API / Python, Formats (JSON, Avro, Parquet, ORC, > SequenceFile), Table SQL / Ecosystem > Affects Versions: 1.13.0 > Reporter: Samuel Fiddis > Priority: Major > > In PyFlink, attempting to connect to a avro-confluent kafka stream where the > Confluent Schema Registry requires authorization does not work. > Table API definition: > {code:java} > ddl_kafka_avro_confluent_source = f""" > CREATE TABLE gtt_records( > **table columsn** > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic.avro-v1', > 'properties.bootstrap.servers' = > 'pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092', > 'properties.security.protocol' = 'SASL_SSL', > 'properties.sasl.mechanism' = 'PLAIN', > 'properties.sasl.jaas.config' = > 'org.apache.kafka.common.security.plain.PlainLoginModule required > username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET}";', > 'properties.basic.auth.credentials.source' = 'USER_INFO', > 'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}', > 'key.format' = 'avro-confluent', > 'key.avro-confluent.schema-registry.url' = > 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud', > > 'key.fields' = '**key fields**', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.schema-registry.url' = > 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud', > > 'value.fields-include' = 'ALL', > 'key.avro-confluent.schema-registry.subject' = 'topic.avro-v1-key', > 'value.avro-confluent.schema-registry.subject' = 'topic.avro-v1-value' > ) """{code} > > Attempting to run a job with this table as a source results in a 401 error > for the Confluent Schema Registry: > > {code:java} > 2021-05-19 04:50:21,830 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: TableSourceScan(table=[[default_catalog, > default_database, gtt_records]], fields=[unique, direction, window_ts, > road_number, link_number, carriageway, version_no, window_local_date, > window_local_time, poll_ts, duration, traffic_duration, distance, > link_length]) -> Sink: > Sink(table=[default_catalog.default_database.kafka_messages], fields=[unique, > direction, window_ts, road_number, link_number, carriageway, version_no, > window_local_date, window_local_time, poll_ts, duration, traffic_duration, > distance, link_length]) (1/1)#0 (7eddc3a42dbcad0fc313bb6bdfa2c922) switched > from RUNNING to FAILED with failure cause: java.io.IOException: Failed to > deserialize Avro record. at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused > by: java.io.IOException: Could not find schema with id 100001 in registry > at > org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77) > at > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) > ... 9 moreCaused by: > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Unauthorized; error code: 401 at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276) > at > org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64) > at > org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74) > ... 11 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)