Hi Everyone,

I am trying to fix the flink-playground for version 1.14.4 and was working
on fixing pyflink-walkthrough and I getting following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.properties.bootstrap.servers=kafka:9092
connector.properties.group.id=test_3
connector.startup-mode=latest-offset
connector.topic=payment_msg
connector.type=kafka
connector.version=universal
format=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=createTime
schema.1.data-type=BIGINT
schema.1.name=orderId
schema.2.data-type=DOUBLE
schema.2.name=payAmount
schema.3.data-type=INT
schema.3.name=payPlatform
schema.4.data-type=INT
schema.4.name=provinceId

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:315)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:193)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:154)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:108)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
... 59 more

while executing the following DDL query

create_kafka_source_ddl = """
        CREATE TABLE payment_msg(
            createTime VARCHAR,
            orderId BIGINT,
            payAmount DOUBLE,
            payPlatform INT,
            provinceId INT
        ) WITH (
          'connector.type' = 'kafka',
          'connector.version' = 'universal',
          'connector.topic' = 'payment_msg',
          'connector.properties.bootstrap.servers' = 'kafka:9092',
          'connector.properties.group.id' = 'test_3',
          'connector.startup-mode' = 'latest-offset',
          'format.type' = 'json'
        )
        """

Not sure, why its not looking for JSON parsing factory and going toward
deprecated csv parse factory. Anybody who can help with this?
I would really appreciate it.

Thanks,
Shubham

Reply via email to