Hi Manas, I think you forgot to add kafka jar[1] dependency. You can use the argument -j of the command line[2] or the Python Table API to specify the jar. For details about the APIs of adding Java dependency, you can refer to the relevant documentation[3]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples [3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency Best, Xingbo Manas Kale <manaskal...@gmail.com> 于2020年8月28日周五 下午9:06写道: > Hi, > I am trying to deploy a pyFlink application on a local cluster. I am able > to run my application without any problems if I execute it as a normal > python program using the command : > python myApplication.py > My pyFlink version is __version__ = "1.11.0". > I had installed this pyFlink through conda/pip (don't remember which). > > Per instructions given in [1] I have ensured that running the command > "python" gets me to a python 3.7 shell with pyFlink installed. > I have also ensured my local Flink cluster version is 1.11.0 (same as > above). > However, if I execute the application using the command: > bin/flink run -py myApplication.py > > I get the error: > > Traceback (most recent call last): > File "basic_streaming_job.py", line 65, in <module> > main() > File "basic_streaming_job.py", line 43, in main > """) > File > "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/ > table_environment.py", line 543, in execute_sql > File > "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j > /java_gateway.py", line 1286, in __call__ > File > "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e > xceptions.py", line 147, in deco > File > "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j > /protocol.py", line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o5.executeSql. > : org.apache.flink.table.api.ValidationException: Unable to create a > source for reading table > 'default_catalog.default_database.raw_message'. > > Table options are: > > 'connector'='kafka' > 'format'='json' > 'properties.bootstrap.servers'='localhost:9092' > 'topic'='basic_features_normalized' > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12 > 5) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal > ogSourceTable.scala:135) > ..... > > The offending table schema in question : > > CREATE TABLE {INPUT_TABLE} ( > monitorId STRING, > deviceId STRING, > state INT, > feature_1 DOUBLE, > feature_2 DOUBLE, > feature_3 DOUBLE, > feature_4 DOUBLE, > feature_5 DOUBLE, > feature_6 DOUBLE, > time_str TIMESTAMP(3), > WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{KAFKA}', > 'format' = 'json' > ) > > Clearly, even though my standalone pyFlink version and cluster Flink > versions are the same, something is different with the cluster runtime. > What could that be? > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples >