Hi:
flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  - name: t_users

    type: source-table

    connector:

        property-version: 1

        type: kafka

        version: universal

        topic: ods.userAnalysis.user_profile

        startup-mode: latest-offset

        properties:

            bootstrap.servers: hostname:9092

            group.id: flink-analysis

    format:

        type: debezium-avro-confluent

        property-version: 1

        debezium-avro-confluent.schema-registry.url: http://hostname:8081

        #schema-registry.url: http://hostname:8081

    schema:

        - name: userId

          data-type: STRING

        - name: province

          data-type: STRING

        - name: city

          data-type: STRING

        - name: age

          data-type: INT

        - name: education

          data-type: STRING

        - name: jobType

          data-type: STRING

        - name: marriage

          data-type: STRING

        - name: sex

          data-type: STRING

        - name: interest

          data-type: STRING




我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.


Reason: Required context properties mismatch.


The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url=
http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest


The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more


此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。


祝好!

回复