Hi Everyone, I am trying to fix the flink-playground for version 1.14.4 and was working on fixing pyflink-walkthrough and I getting following error
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.properties.bootstrap.servers=kafka:9092 connector.properties.group.id=test_3 connector.startup-mode=latest-offset connector.topic=payment_msg connector.type=kafka connector.version=universal format=json schema.0.data-type=VARCHAR(2147483647) schema.0.name=createTime schema.1.data-type=BIGINT schema.1.name=orderId schema.2.data-type=DOUBLE schema.2.name=payAmount schema.3.data-type=INT schema.3.name=payPlatform schema.4.data-type=INT schema.4.name=provinceId The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:315) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:193) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:154) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:108) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41) ... 59 more while executing the following DDL query create_kafka_source_ddl = """ CREATE TABLE payment_msg( createTime VARCHAR, orderId BIGINT, payAmount DOUBLE, payPlatform INT, provinceId INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'payment_msg', 'connector.properties.bootstrap.servers' = 'kafka:9092', 'connector.properties.group.id' = 'test_3', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ) """ Not sure, why its not looking for JSON parsing factory and going toward deprecated csv parse factory. Anybody who can help with this? I would really appreciate it. Thanks, Shubham