Hi Wojtek,
you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar'
which you can download in the doc[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Xingbo

Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四
下午4:57写道:

> Hello,
>
> I am trying to deploy a Python job with Kafka connector:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment,
> StreamTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka,
> Json, Csv
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_config = TableConfig()
> t_env = StreamTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(Kafka()
>               .version("0.11")
>               .topic("my-topic")
>               .property("bootstrap.servers",
> 'my-cluster-kafka-bootstrap:9092')
>               ) \
>     .in_append_mode() \
>     .with_format(Csv()
>                  .line_delimiter("\r\n")      \
>                  .derive_schema()) \
>     .with_schema(Schema()
>                  .field("tbd", DataTypes.INT())) \
>     .register_table_source('mySource')
>
> t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \
>     .with_format(OldCsv()
>                  .field('tbd', DataTypes.INT())) \
>     .with_schema(Schema()
>                  .field("tbd", DataTypes.INT())) \
>     .register_table_sink('mySink')
>
> t_env.scan('mySource') \
>     .select('tbd') \
>     .where("tbd = 1") \
>     .insert_into('mySink')
>
> t_env.execute("tutorial_job")
>
> When I run a deploying command:
> bin/flink run -py
> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
> /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
>
> I get an error:
> Traceback (most recent call last):
>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line
> 9, in <module>
>     t_env = StreamTableEnvironment.create(exec_env, t_config)
>   File
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1478, in create
>   File
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
> : java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
> at java.base/java.lang.ClassLoader.defineClass1(Native Method)
> at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
> at
> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
> at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
> at java.base/java.security.AccessController.doPrivileged(Native Method)
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> at java.base/java.lang.Class.forName0(Native Method)
> at java.base/java.lang.Class.forName(Class.java:398)
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
> at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
> at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
> at
> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
> at
> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
> at
> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
> at
> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
> at
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 39 more
>
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> What is the correct way to deploy python job on Flink which uses Kafka? It
> seems like it cannot get a correct dependency of Kafka.
>
> I wonder if there is some more simply solution and if it matters that i
> would like deploy a job on the K8s cluster.
>
> Thanks,
> Wojtek
>
>
> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje
> poufne, które mogą być również objęte tajemnicą handlową lub służbową.
> Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
> skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu.
> Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości
> oraz zawartych w niej informacji jest zabronione.
>
> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego,
> prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział
> Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
> Kapitał zakładowy: 5.000 PLN w pełni opłacony.
>
> NOTE - Message and the documents attached thereto contain confidential
> information, which may also be a trade secret or confidential. If you are
> not the intended recipient of the message, please contact the sender
> without delay and delete the message from your system. Disclosure, copying,
> dissemination or publication of this message and information contained
> therein is prohibited.
>
> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
> Poland;
> Registered under the KRS number 0000621513 to the National Court Register,
> kept by the District Court for Wrocław-Fabryczna VI Economic Department of
> the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share
> capital: PLN 5.000 fully paid-up.
>

Reply via email to