Hi Xingbo, Thanks, that worked. Just to make sure, the only way currently to submit a pyFlink job is through the command line right? Can I do that through the GUI?
On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Manas, > > I think you forgot to add kafka jar[1] dependency. You can use the > argument -j of the command line[2] or the Python Table API to specify the > jar. For details about the APIs of adding Java dependency, you can refer to > the relevant documentation[3] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency > > Best, > Xingbo > > Manas Kale <manaskal...@gmail.com> 于2020年8月28日周五 下午9:06写道: > >> Hi, >> I am trying to deploy a pyFlink application on a local cluster. I am able >> to run my application without any problems if I execute it as a normal >> python program using the command : >> python myApplication.py >> My pyFlink version is __version__ = "1.11.0". >> I had installed this pyFlink through conda/pip (don't remember which). >> >> Per instructions given in [1] I have ensured that running the command >> "python" gets me to a python 3.7 shell with pyFlink installed. >> I have also ensured my local Flink cluster version is 1.11.0 (same as >> above). >> However, if I execute the application using the command: >> bin/flink run -py myApplication.py >> >> I get the error: >> >> Traceback (most recent call last): >> File "basic_streaming_job.py", line 65, in <module> >> main() >> File "basic_streaming_job.py", line 43, in main >> """) >> File >> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/ >> table_environment.py", line 543, in execute_sql >> File >> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j >> /java_gateway.py", line 1286, in __call__ >> File >> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e >> xceptions.py", line 147, in deco >> File >> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j >> /protocol.py", line 328, in get_return_value >> py4j.protocol.Py4JJavaError: An error occurred while calling >> o5.executeSql. >> : org.apache.flink.table.api.ValidationException: Unable to create a >> source for reading table >> 'default_catalog.default_database.raw_message'. >> >> Table options are: >> >> 'connector'='kafka' >> 'format'='json' >> 'properties.bootstrap.servers'='localhost:9092' >> 'topic'='basic_features_normalized' >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12 >> 5) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal >> ogSourceTable.scala:135) >> ..... >> >> The offending table schema in question : >> >> CREATE TABLE {INPUT_TABLE} ( >> monitorId STRING, >> deviceId STRING, >> state INT, >> feature_1 DOUBLE, >> feature_2 DOUBLE, >> feature_3 DOUBLE, >> feature_4 DOUBLE, >> feature_5 DOUBLE, >> feature_6 DOUBLE, >> time_str TIMESTAMP(3), >> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{INPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{KAFKA}', >> 'format' = 'json' >> ) >> >> Clearly, even though my standalone pyFlink version and cluster Flink >> versions are the same, something is different with the cluster runtime. >> What could that be? >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples >> >