Samuel Fiddis created FLINK-22763:
-------------------------------------
Summary: avro-confluent format does not allow for authorization
credentials to be supplied to Confluent Schema Registery
Key: FLINK-22763
URL: https://issues.apache.org/jira/browse/FLINK-22763
Project: Flink
Issue Type: Bug
Components: API / Python, Formats (JSON, Avro, Parquet, ORC,
SequenceFile), Table SQL / API
Affects Versions: 1.13.0
Reporter: Samuel Fiddis
In PyFlink, attempting to connect to a avro-confluent kafka stream where the
Confluent Schema Registry requires authorization does not work.
Table API definition:
{code:java}
ddl_kafka_avro_confluent_source = f"""
CREATE TABLE gtt_records(
**table columsn**
) WITH (
'connector' = 'kafka',
'topic' = 'topic.avro-v1',
'properties.bootstrap.servers' =
'pkc-ldvj1.ap-southeast-2.aws.confluent.cloud:9092',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' =
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="{KAFKA_API_KEY}" password="{KAFKA_API_SECRET";',
'properties.basic.auth.credentials.source' = 'USER_INFO',
'properties.basic.auth.user.info' = '{SR_API_KEY}:{SR_API_SECRET}',
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' =
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
'key.fields' = '**key fields**',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' =
'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
'value.fields-include' = 'ALL',
'key.avro-confluent.schema-registry.subject' =
'data.google-travel-time.avro-v1-key',
'value.avro-confluent.schema-registry.subject' =
'data.google-travel-time.avro-v1-value'
) """{code}
Attempting to run a job with this table as a source results in a 401 error for
the Confluent Schema Registery:
{code:java}
2021-05-19 04:50:21,830 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: TableSourceScan(table=[[default_catalog,
default_database, gtt_records]], fields=[unique, direction, window_ts,
road_number, link_number, carriageway, version_no, window_local_date,
window_local_time, poll_ts, duration, traffic_duration, distance, link_length])
-> Sink: Sink(table=[default_catalog.default_database.kafka_messages],
fields=[unique, direction, window_ts, road_number, link_number, carriageway,
version_no, window_local_date, window_local_time, poll_ts, duration,
traffic_duration, distance, link_length]) (1/1)#0
(7eddc3a42dbcad0fc313bb6bdfa2c922) switched from RUNNING to FAILED with failure
cause: java.io.IOException: Failed to deserialize Avro record. at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:119)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)Caused
by: java.io.IOException: Could not find schema with id 100001 in registry
at
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:73)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 9 moreCaused by:
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Unauthorized; error code: 401 at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
at
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
... 11 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)