Hi Wojtek, you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar' which you can download in the doc[1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html Best, Xingbo Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四 下午4:57写道: > Hello, > > I am trying to deploy a Python job with Kafka connector: > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.dataset import ExecutionEnvironment > from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, > StreamTableEnvironment > from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, > Json, Csv > > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_config = TableConfig() > t_env = StreamTableEnvironment.create(exec_env, t_config) > > t_env.connect(Kafka() > .version("0.11") > .topic("my-topic") > .property("bootstrap.servers", > 'my-cluster-kafka-bootstrap:9092') > ) \ > .in_append_mode() \ > .with_format(Csv() > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("tbd", DataTypes.INT())) \ > .register_table_source('mySource') > > t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \ > .with_format(OldCsv() > .field('tbd', DataTypes.INT())) \ > .with_schema(Schema() > .field("tbd", DataTypes.INT())) \ > .register_table_sink('mySink') > > t_env.scan('mySource') \ > .select('tbd') \ > .where("tbd = 1") \ > .insert_into('mySink') > > t_env.execute("tutorial_job") > > When I run a deploying command: > bin/flink run -py > /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile > /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar > > I get an error: > Traceback (most recent call last): > File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line > 9, in <module> > t_env = StreamTableEnvironment.create(exec_env, t_config) > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 1478, in create > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create. > : java.lang.NoClassDefFoundError: > org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase > at java.base/java.lang.ClassLoader.defineClass1(Native Method) > at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) > at > java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) > at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550) > at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458) > at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452) > at java.base/java.security.AccessController.doPrivileged(Native Method) > at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > at java.base/java.lang.Class.forName0(Native Method) > at java.base/java.lang.Class.forName(Class.java:398) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264) > at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299) > at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384) > at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) > at > org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) > at > org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) > at > org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) > at > org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135) > at > org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase > at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > ... 39 more > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > What is the correct way to deploy python job on Flink which uses Kafka? It > seems like it cannot get a correct dependency of Kafka. > > I wonder if there is some more simply solution and if it matters that i > would like deploy a job on the K8s cluster. > > Thanks, > Wojtek > > > UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje > poufne, które mogą być również objęte tajemnicą handlową lub służbową. > Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie > skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. > Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości > oraz zawartych w niej informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, > prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział > Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; > Kapitał zakładowy: 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached thereto contain confidential > information, which may also be a trade secret or confidential. If you are > not the intended recipient of the message, please contact the sender > without delay and delete the message from your system. Disclosure, copying, > dissemination or publication of this message and information contained > therein is prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > Poland; > Registered under the KRS number 0000621513 to the National Court Register, > kept by the District Court for Wrocław-Fabryczna VI Economic Department of > the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share > capital: PLN 5.000 fully paid-up. >