Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument
-j of the command line[2] or the Python Table API to specify the jar. For
details about the APIs of adding Java dependency, you can refer to the
relevant documentation[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <manaskal...@gmail.com> 于2020年8月28日周五 下午9:06写道:

> Hi,
> I am trying to deploy a pyFlink application on a local cluster. I am able
> to run my application without any problems if I execute it as a normal
> python program using the command :
> python myApplication.py
> My pyFlink version is __version__ = "1.11.0".
> I had installed this pyFlink through conda/pip (don't remember which).
>
> Per instructions given in [1] I have ensured that running the command
> "python" gets me to a python 3.7 shell with pyFlink installed.
> I have also ensured my local Flink cluster version is 1.11.0 (same as
> above).
> However, if I execute the application using the command:
> bin/flink run -py myApplication.py
>
> I get the error:
>
> Traceback (most recent call last):
>  File "basic_streaming_job.py", line 65, in <module>
>    main()
>  File "basic_streaming_job.py", line 43, in main
>    """)
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
> table_environment.py", line 543, in execute_sql
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
> /java_gateway.py", line 1286, in __call__
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
> xceptions.py", line 147, in deco
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
> /protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o5.executeSql.
> : org.apache.flink.table.api.ValidationException: Unable to create a
> source for reading table
> 'default_catalog.default_database.raw_message'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'topic'='basic_features_normalized'
>        at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
> 5)
>        at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
> ogSourceTable.scala:135)
> .....
>
> The offending table schema in question :
>
> CREATE TABLE {INPUT_TABLE} (
>     monitorId STRING,
>     deviceId STRING,
>     state INT,
>     feature_1 DOUBLE,
>     feature_2 DOUBLE,
>     feature_3 DOUBLE,
>     feature_4 DOUBLE,
>     feature_5 DOUBLE,
>     feature_6 DOUBLE,
>     time_str TIMESTAMP(3),
>     WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = '{INPUT_TOPIC}',
>     'properties.bootstrap.servers' = '{KAFKA}',
>     'format' = 'json'
> )
>
> Clearly, even though my standalone pyFlink version and cluster Flink
> versions are the same, something is different with the cluster runtime.
> What could that be?
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>

Reply via email to