Hi Wojtek, The following ways of using Pyflink is my personal recommendation:
1. Use DDL[1] to create your source and sink instead of the descriptor way, because as of flink 1.11, there are some bugs in the descriptor way. 2. Use `execute_sql` for single statement, use `create_statement_set` for multiple DML statements.[2] 3. Use `execute_insert` for single sink, use `TableTableEnvironment#create_statement_set` for multiple sinks 4. Use `from_path` method instead of `scan` method 5. Call the method `get_job_client().get_job_execution_result().result()` of TableResult which is the returned type of execute_insert or execute_sql after calling the method `excute_*` All PyFlink related common questions you can refer to the doc[3] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html Best, Xingbo Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月24日周五 下午4:44写道: > Hi, > thank you for your answer, it is very helpful. > > Currently my python program looks like: > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.dataset import ExecutionEnvironment > from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, > StreamTableEnvironment > from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, > Json, Csv > > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_config = TableConfig() > t_env = StreamTableEnvironment.create(exec_env, t_config) > > t_env.connect(Kafka() > .version("universal") > .topic("my-topic") > .property("bootstrap.servers", > 'my-cluster-kafka-bootstrap:9092') > ) \ > .in_append_mode() \ > .with_format(Csv() > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("value", DataTypes.STRING())) \ > .create_temporary_table('mySource') > > t_env.connect(Kafka() > .version("universal") > .topic("my-topic-out") > .property("bootstrap.servers", > 'my-cluster-kafka-bootstrap:9092') > ) \ > .with_format(Csv() > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("value", DataTypes.STRING())) \ > .in_append_mode() \ > .create_temporary_table('mySink') > > > t_env.scan('mySource') \ > .select('"flink_job_" + value') \ > .insert_into('mySink') > > t_env.execute("tutorial_job") > > I have installed PyFlink 1.11 so the IDE is pointing me out the commands > connect, scan, insert_into, *execute *are deprectade. What is the correct > way the program should be different following 1.11 version of PyFlink? > > Kind regards, > Wojtek > > > pt., 24 lip 2020 o 04:21 Xingbo Huang <hxbks...@gmail.com> napisał(a): > >> Hi Wojtek, >> In flink 1.11, the methods register_table_source and register_table_sink >> of ConnectTableDescriptor have been removed. You need to use >> createTemporaryTable instead of these two methods.Besides, it seems that >> the version of your pyflink is 1.10, but the corresponding flink is 1.11. >> >> Best, >> Xingbo >> >> Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四 >> 下午9:01写道: >> >>> Thank you for your answer. >>> >>> I have replaced that .jar with Kafka version universal - the links to >>> other versions are extinct. >>> >>> After the attempt of deploying: >>> bin/flink run -py >>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile >>> /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar >>> >>> there another error occurs: >>> Traceback (most recent call last): >>> File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", >>> line 20, in <module> >>> .field("tbd", DataTypes.INT())) \ >>> AttributeError: 'StreamTableDescriptor' object has no attribute >>> 'register_table_source' >>> org.apache.flink.client.program.ProgramAbortException >>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>> at >>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>> >>> Maybe the way the python program is written is incorrect. Can it be >>> deprecated taking into account that the installed flink version is 1.11? >>> >>> Best regards, >>> Wojtek >>> >>> czw., 23 lip 2020 o 12:01 Xingbo Huang <hxbks...@gmail.com> napisał(a): >>> >>>> Hi Wojtek, >>>> you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar' >>>> which you can download in the doc[1] >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>>> >>>> Best, >>>> Xingbo >>>> >>>> Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四 >>>> 下午4:57写道: >>>> >>>>> Hello, >>>>> >>>>> I am trying to deploy a Python job with Kafka connector: >>>>> >>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>> from pyflink.dataset import ExecutionEnvironment >>>>> from pyflink.table import TableConfig, DataTypes, >>>>> BatchTableEnvironment, StreamTableEnvironment >>>>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, >>>>> Kafka, Json, Csv >>>>> >>>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>>> t_config = TableConfig() >>>>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>>>> >>>>> t_env.connect(Kafka() >>>>> .version("0.11") >>>>> .topic("my-topic") >>>>> .property("bootstrap.servers", >>>>> 'my-cluster-kafka-bootstrap:9092') >>>>> ) \ >>>>> .in_append_mode() \ >>>>> .with_format(Csv() >>>>> .line_delimiter("\r\n") \ >>>>> .derive_schema()) \ >>>>> .with_schema(Schema() >>>>> .field("tbd", DataTypes.INT())) \ >>>>> .register_table_source('mySource') >>>>> >>>>> t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \ >>>>> .with_format(OldCsv() >>>>> .field('tbd', DataTypes.INT())) \ >>>>> .with_schema(Schema() >>>>> .field("tbd", DataTypes.INT())) \ >>>>> .register_table_sink('mySink') >>>>> >>>>> t_env.scan('mySource') \ >>>>> .select('tbd') \ >>>>> .where("tbd = 1") \ >>>>> .insert_into('mySink') >>>>> >>>>> t_env.execute("tutorial_job") >>>>> >>>>> When I run a deploying command: >>>>> bin/flink run -py >>>>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile >>>>> /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar >>>>> >>>>> I get an error: >>>>> Traceback (most recent call last): >>>>> File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", >>>>> line 9, in <module> >>>>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>>>> File >>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py", >>>>> line 1478, in create >>>>> File >>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", >>>>> line 1286, in __call__ >>>>> File >>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", >>>>> line 147, in deco >>>>> File >>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", >>>>> line 328, in get_return_value >>>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>> z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create. >>>>> : java.lang.NoClassDefFoundError: >>>>> org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase >>>>> at java.base/java.lang.ClassLoader.defineClass1(Native Method) >>>>> at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) >>>>> at >>>>> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) >>>>> at >>>>> java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550) >>>>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458) >>>>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452) >>>>> at java.base/java.security.AccessController.doPrivileged(Native Method) >>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451) >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>>>> at >>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>>>> at java.base/java.lang.Class.forName0(Native Method) >>>>> at java.base/java.lang.Class.forName(Class.java:398) >>>>> at >>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209) >>>>> at >>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220) >>>>> at >>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264) >>>>> at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299) >>>>> at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384) >>>>> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) >>>>> at >>>>> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) >>>>> at >>>>> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) >>>>> at >>>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158) >>>>> at >>>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135) >>>>> at >>>>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143) >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>> Method) >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>>>> at >>>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>>>> at java.base/java.lang.Thread.run(Thread.java:834) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase >>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>>>> at >>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>>>> ... 39 more >>>>> >>>>> org.apache.flink.client.program.ProgramAbortException >>>>> at >>>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>> Method) >>>>> at >>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>>>> at >>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>>>> at >>>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>>>> >>>>> What is the correct way to deploy python job on Flink which uses >>>>> Kafka? It seems like it cannot get a correct dependency of Kafka. >>>>> >>>>> I wonder if there is some more simply solution and if it matters that >>>>> i would like deploy a job on the K8s cluster. >>>>> >>>>> Thanks, >>>>> Wojtek >>>>> >>>> >>> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje >>> poufne, które mogą być również objęte tajemnicą handlową lub służbową. >>> Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie >>> skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. >>> Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości >>> oraz zawartych w niej informacji jest zabronione. >>> >>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, >>> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, >>> prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział >>> Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; >>> Kapitał zakładowy: 5.000 PLN w pełni opłacony. >>> >>> NOTE - Message and the documents attached thereto contain confidential >>> information, which may also be a trade secret or confidential. If you are >>> not the intended recipient of the message, please contact the sender >>> without delay and delete the message from your system. Disclosure, copying, >>> dissemination or publication of this message and information contained >>> therein is prohibited. >>> >>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, >>> Poland; >>> Registered under the KRS number 0000621513 to the National Court >>> Register, kept by the District Court for Wrocław-Fabryczna VI Economic >>> Department of the National Court Register, VAT-ID: PL8943079568, REGON >>> 364634116; Share capital: PLN 5.000 fully paid-up. >>> >> > UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje > poufne, które mogą być również objęte tajemnicą handlową lub służbową. > Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie > skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. > Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości > oraz zawartych w niej informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, > prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział > Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; > Kapitał zakładowy: 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached thereto contain confidential > information, which may also be a trade secret or confidential. If you are > not the intended recipient of the message, please contact the sender > without delay and delete the message from your system. Disclosure, copying, > dissemination or publication of this message and information contained > therein is prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > Poland; > Registered under the KRS number 0000621513 to the National Court Register, > kept by the District Court for Wrocław-Fabryczna VI Economic Department of > the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share > capital: PLN 5.000 fully paid-up. >