Thank you for your answer.

I have replaced that .jar with Kafka version universal - the links to
other versions are extinct.

After the attempt of deploying:
bin/flink run -py
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
/home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar

there another error occurs:
Traceback (most recent call last):
  File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line
20, in <module>
    .field("tbd", DataTypes.INT())) \
AttributeError: 'StreamTableDescriptor' object has no attribute
'register_table_source'
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Maybe the way the python program is written is incorrect. Can it be
deprecated taking into account that the installed flink version is 1.11?

Best regards,
Wojtek

czw., 23 lip 2020 o 12:01 Xingbo Huang <hxbks...@gmail.com> napisał(a):

> Hi Wojtek,
> you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar'
> which you can download in the doc[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Xingbo
>
> Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四
> 下午4:57写道:
>
>> Hello,
>>
>> I am trying to deploy a Python job with Kafka connector:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment,
>> StreamTableEnvironment
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka,
>> Json, Csv
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> t_env.connect(Kafka()
>>               .version("0.11")
>>               .topic("my-topic")
>>               .property("bootstrap.servers",
>> 'my-cluster-kafka-bootstrap:9092')
>>               ) \
>>     .in_append_mode() \
>>     .with_format(Csv()
>>                  .line_delimiter("\r\n")      \
>>                  .derive_schema()) \
>>     .with_schema(Schema()
>>                  .field("tbd", DataTypes.INT())) \
>>     .register_table_source('mySource')
>>
>> t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \
>>     .with_format(OldCsv()
>>                  .field('tbd', DataTypes.INT())) \
>>     .with_schema(Schema()
>>                  .field("tbd", DataTypes.INT())) \
>>     .register_table_sink('mySink')
>>
>> t_env.scan('mySource') \
>>     .select('tbd') \
>>     .where("tbd = 1") \
>>     .insert_into('mySink')
>>
>> t_env.execute("tutorial_job")
>>
>> When I run a deploying command:
>> bin/flink run -py
>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
>> /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
>>
>> I get an error:
>> Traceback (most recent call last):
>>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line
>> 9, in <module>
>>     t_env = StreamTableEnvironment.create(exec_env, t_config)
>>   File
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>> line 1478, in create
>>   File
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>> line 1286, in __call__
>>   File
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> line 147, in deco
>>   File
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
>> : java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
>> at java.base/java.lang.ClassLoader.defineClass1(Native Method)
>> at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
>> at
>> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
>> at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
>> at java.base/java.security.AccessController.doPrivileged(Native Method)
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> at java.base/java.lang.Class.forName0(Native Method)
>> at java.base/java.lang.Class.forName(Class.java:398)
>> at
>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
>> at
>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
>> at
>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
>> at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
>> at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
>> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
>> at
>> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
>> at
>> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
>> at
>> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
>> at
>> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
>> at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
>> at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
>> at
>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 39 more
>>
>> org.apache.flink.client.program.ProgramAbortException
>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>
>> What is the correct way to deploy python job on Flink which uses Kafka?
>> It seems like it cannot get a correct dependency of Kafka.
>>
>> I wonder if there is some more simply solution and if it matters that i
>> would like deploy a job on the K8s cluster.
>>
>> Thanks,
>> Wojtek
>>
>

-- 
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje 
poufne, które mogą być również objęte tajemnicą handlową lub służbową. 
Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie 
skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. 
Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości 
oraz zawartych w niej informacji jest zabronione.


Alphamoon Sp. z o.o., 
ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS 
0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy 
dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru 
Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w 
pełni opłacony.


NOTE - Message and the documents attached thereto contain 
confidential information, which may also be a trade secret or confidential. 
If you are not the intended recipient of the message, please contact the 
sender without delay and delete the message from your system. Disclosure, 
copying, dissemination or publication of this message and information 
contained therein is prohibited.


Alphamoon Sp. z o.o. (Ltd.), ul. Pawła 
Włodkowica 21/3, 50-072 Wrocław, Poland;
Registered under the KRS number 
0000621513 to the National Court Register, kept by the District Court for 
Wrocław-Fabryczna VI Economic Department of the National Court Register, 
VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully 
paid-up.

Reply via email to