Ok, thank you! On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang, <hxbks...@gmail.com> wrote:
> Hi Manas, > > We can't submit a pyflink job through flink web currently. The only way > currently to submit a pyFlink job is through the command line. > > Best, > Xingbo > > Manas Kale <manaskal...@gmail.com> 于2020年8月29日周六 下午12:51写道: > >> Hi Xingbo, >> Thanks, that worked. Just to make sure, the only way currently to submit >> a pyFlink job is through the command line right? Can I do that through the >> GUI? >> >> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <hxbks...@gmail.com> wrote: >> >>> Hi Manas, >>> >>> I think you forgot to add kafka jar[1] dependency. You can use the >>> argument -j of the command line[2] or the Python Table API to specify the >>> jar. For details about the APIs of adding Java dependency, you can refer to >>> the relevant documentation[3] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency >>> >>> Best, >>> Xingbo >>> >>> Manas Kale <manaskal...@gmail.com> 于2020年8月28日周五 下午9:06写道: >>> >>>> Hi, >>>> I am trying to deploy a pyFlink application on a local cluster. I am >>>> able to run my application without any problems if I execute it as a normal >>>> python program using the command : >>>> python myApplication.py >>>> My pyFlink version is __version__ = "1.11.0". >>>> I had installed this pyFlink through conda/pip (don't remember which). >>>> >>>> Per instructions given in [1] I have ensured that running the command >>>> "python" gets me to a python 3.7 shell with pyFlink installed. >>>> I have also ensured my local Flink cluster version is 1.11.0 (same as >>>> above). >>>> However, if I execute the application using the command: >>>> bin/flink run -py myApplication.py >>>> >>>> I get the error: >>>> >>>> Traceback (most recent call last): >>>> File "basic_streaming_job.py", line 65, in <module> >>>> main() >>>> File "basic_streaming_job.py", line 43, in main >>>> """) >>>> File >>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/ >>>> table_environment.py", line 543, in execute_sql >>>> File >>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j >>>> /java_gateway.py", line 1286, in __call__ >>>> File >>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e >>>> xceptions.py", line 147, in deco >>>> File >>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j >>>> /protocol.py", line 328, in get_return_value >>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> o5.executeSql. >>>> : org.apache.flink.table.api.ValidationException: Unable to create a >>>> source for reading table >>>> 'default_catalog.default_database.raw_message'. >>>> >>>> Table options are: >>>> >>>> 'connector'='kafka' >>>> 'format'='json' >>>> 'properties.bootstrap.servers'='localhost:9092' >>>> 'topic'='basic_features_normalized' >>>> at >>>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12 >>>> 5) >>>> at >>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal >>>> ogSourceTable.scala:135) >>>> ..... >>>> >>>> The offending table schema in question : >>>> >>>> CREATE TABLE {INPUT_TABLE} ( >>>> monitorId STRING, >>>> deviceId STRING, >>>> state INT, >>>> feature_1 DOUBLE, >>>> feature_2 DOUBLE, >>>> feature_3 DOUBLE, >>>> feature_4 DOUBLE, >>>> feature_5 DOUBLE, >>>> feature_6 DOUBLE, >>>> time_str TIMESTAMP(3), >>>> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' >>>> SECOND >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = '{INPUT_TOPIC}', >>>> 'properties.bootstrap.servers' = '{KAFKA}', >>>> 'format' = 'json' >>>> ) >>>> >>>> Clearly, even though my standalone pyFlink version and cluster Flink >>>> versions are the same, something is different with the cluster runtime. >>>> What could that be? >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples >>>> >>>