Hi, I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command : python myApplication.py My pyFlink version is __version__ = "1.11.0". I had installed this pyFlink through conda/pip (don't remember which).
Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed. I have also ensured my local Flink cluster version is 1.11.0 (same as above). However, if I execute the application using the command: bin/flink run -py myApplication.py I get the error: Traceback (most recent call last): File "basic_streaming_job.py", line 65, in <module> main() File "basic_streaming_job.py", line 43, in main """) File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/ table_environment.py", line 543, in execute_sql File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j /java_gateway.py", line 1286, in __call__ File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e xceptions.py", line 147, in deco File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j /protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.raw_message'. Table options are: 'connector'='kafka' 'format'='json' 'properties.bootstrap.servers'='localhost:9092' 'topic'='basic_features_normalized' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12 5) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal ogSourceTable.scala:135) ..... The offending table schema in question : CREATE TABLE {INPUT_TABLE} ( monitorId STRING, deviceId STRING, state INT, feature_1 DOUBLE, feature_2 DOUBLE, feature_3 DOUBLE, feature_4 DOUBLE, feature_5 DOUBLE, feature_6 DOUBLE, time_str TIMESTAMP(3), WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{KAFKA}', 'format' = 'json' ) Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime. What could that be? [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples