Hi Wojtek,
The following ways of using Pyflink is my personal recommendation:

1. Use DDL[1] to create your source and sink instead of the descriptor way,
because as of flink 1.11, there are some bugs in the descriptor way.

2. Use `execute_sql` for single statement, use `create_statement_set` for
multiple DML statements.[2]

3. Use `execute_insert` for single sink, use
`TableTableEnvironment#create_statement_set` for multiple sinks

4. Use `from_path` method instead of `scan` method

5. Call the method `get_job_client().get_job_execution_result().result()`
of TableResult  which is the returned type of execute_insert or execute_sql
after calling the method `excute_*`


All PyFlink related common questions you can refer to the doc[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html

Best,
Xingbo

Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月24日周五
下午4:44写道:

> Hi,
> thank you for your answer, it is very helpful.
>
> Currently my python program looks like:
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
> StreamTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, 
> Json, Csv
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_config = TableConfig()
> t_env = StreamTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(Kafka()
>               .version("universal")
>               .topic("my-topic")
>               .property("bootstrap.servers", 
> 'my-cluster-kafka-bootstrap:9092')
>               ) \
>     .in_append_mode() \
>     .with_format(Csv()
>                  .line_delimiter("\r\n") \
>                  .derive_schema()) \
>     .with_schema(Schema()
>                  .field("value", DataTypes.STRING())) \
>     .create_temporary_table('mySource')
>
> t_env.connect(Kafka()
>               .version("universal")
>               .topic("my-topic-out")
>               .property("bootstrap.servers", 
> 'my-cluster-kafka-bootstrap:9092')
>               ) \
>     .with_format(Csv()
>              .line_delimiter("\r\n") \
>              .derive_schema()) \
>     .with_schema(Schema()
>                  .field("value", DataTypes.STRING())) \
>     .in_append_mode() \
>     .create_temporary_table('mySink')
>
>
> t_env.scan('mySource') \
>     .select('"flink_job_" + value') \
>     .insert_into('mySink')
>
> t_env.execute("tutorial_job")
>
> I have installed PyFlink 1.11 so the IDE is pointing me out the commands
> connect, scan, insert_into, *execute *are deprectade. What is the correct
> way the program should be different following 1.11 version of PyFlink?
>
> Kind regards,
> Wojtek
>
>
> pt., 24 lip 2020 o 04:21 Xingbo Huang <hxbks...@gmail.com> napisał(a):
>
>> Hi Wojtek,
>> In flink 1.11, the methods register_table_source and register_table_sink
>> of ConnectTableDescriptor have been removed. You need to use
>> createTemporaryTable instead of these two methods.Besides, it seems that
>> the version of your pyflink is 1.10, but the corresponding flink is 1.11.
>>
>> Best,
>> Xingbo
>>
>> Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四
>> 下午9:01写道:
>>
>>> Thank you for your answer.
>>>
>>> I have replaced that .jar with Kafka version universal - the links to
>>> other versions are extinct.
>>>
>>> After the attempt of deploying:
>>> bin/flink run -py
>>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
>>> /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar
>>>
>>> there another error occurs:
>>> Traceback (most recent call last):
>>>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
>>> line 20, in <module>
>>>     .field("tbd", DataTypes.INT())) \
>>> AttributeError: 'StreamTableDescriptor' object has no attribute
>>> 'register_table_source'
>>> org.apache.flink.client.program.ProgramAbortException
>>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>> at
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>>
>>> Maybe the way the python program is written is incorrect. Can it be
>>> deprecated taking into account that the installed flink version is 1.11?
>>>
>>> Best regards,
>>> Wojtek
>>>
>>> czw., 23 lip 2020 o 12:01 Xingbo Huang <hxbks...@gmail.com> napisał(a):
>>>
>>>> Hi Wojtek,
>>>> you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar'
>>>> which you can download in the doc[1]
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai> 于2020年7月23日周四
>>>> 下午4:57写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am trying to deploy a Python job with Kafka connector:
>>>>>
>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>> from pyflink.dataset import ExecutionEnvironment
>>>>> from pyflink.table import TableConfig, DataTypes,
>>>>> BatchTableEnvironment, StreamTableEnvironment
>>>>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem,
>>>>> Kafka, Json, Csv
>>>>>
>>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>>> t_config = TableConfig()
>>>>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>>>
>>>>> t_env.connect(Kafka()
>>>>>               .version("0.11")
>>>>>               .topic("my-topic")
>>>>>               .property("bootstrap.servers",
>>>>> 'my-cluster-kafka-bootstrap:9092')
>>>>>               ) \
>>>>>     .in_append_mode() \
>>>>>     .with_format(Csv()
>>>>>                  .line_delimiter("\r\n")      \
>>>>>                  .derive_schema()) \
>>>>>     .with_schema(Schema()
>>>>>                  .field("tbd", DataTypes.INT())) \
>>>>>     .register_table_source('mySource')
>>>>>
>>>>> t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \
>>>>>     .with_format(OldCsv()
>>>>>                  .field('tbd', DataTypes.INT())) \
>>>>>     .with_schema(Schema()
>>>>>                  .field("tbd", DataTypes.INT())) \
>>>>>     .register_table_sink('mySink')
>>>>>
>>>>> t_env.scan('mySource') \
>>>>>     .select('tbd') \
>>>>>     .where("tbd = 1") \
>>>>>     .insert_into('mySink')
>>>>>
>>>>> t_env.execute("tutorial_job")
>>>>>
>>>>> When I run a deploying command:
>>>>> bin/flink run -py
>>>>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile
>>>>> /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
>>>>>
>>>>> I get an error:
>>>>> Traceback (most recent call last):
>>>>>   File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
>>>>> line 9, in <module>
>>>>>     t_env = StreamTableEnvironment.create(exec_env, t_config)
>>>>>   File
>>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>>>>> line 1478, in create
>>>>>   File
>>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>>>>> line 1286, in __call__
>>>>>   File
>>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>>>>> line 147, in deco
>>>>>   File
>>>>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>>>>> line 328, in get_return_value
>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>> z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
>>>>> : java.lang.NoClassDefFoundError:
>>>>> org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
>>>>> at java.base/java.lang.ClassLoader.defineClass1(Native Method)
>>>>> at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
>>>>> at
>>>>> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
>>>>> at
>>>>> java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
>>>>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
>>>>> at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
>>>>> at java.base/java.security.AccessController.doPrivileged(Native Method)
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> at java.base/java.lang.Class.forName0(Native Method)
>>>>> at java.base/java.lang.Class.forName(Class.java:398)
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
>>>>> at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
>>>>> at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
>>>>> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
>>>>> at
>>>>> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
>>>>> at
>>>>> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
>>>>> at
>>>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
>>>>> at
>>>>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
>>>>> at
>>>>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>> at
>>>>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 39 more
>>>>>
>>>>> org.apache.flink.client.program.ProgramAbortException
>>>>> at
>>>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at
>>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>>>> at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>>>> at
>>>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>>>>
>>>>> What is the correct way to deploy python job on Flink which uses
>>>>> Kafka? It seems like it cannot get a correct dependency of Kafka.
>>>>>
>>>>> I wonder if there is some more simply solution and if it matters that
>>>>> i would like deploy a job on the K8s cluster.
>>>>>
>>>>> Thanks,
>>>>> Wojtek
>>>>>
>>>>
>>> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje
>>> poufne, które mogą być również objęte tajemnicą handlową lub służbową.
>>> Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
>>> skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu.
>>> Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości
>>> oraz zawartych w niej informacji jest zabronione.
>>>
>>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
>>> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego,
>>> prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział
>>> Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
>>> Kapitał zakładowy: 5.000 PLN w pełni opłacony.
>>>
>>> NOTE - Message and the documents attached thereto contain confidential
>>> information, which may also be a trade secret or confidential. If you are
>>> not the intended recipient of the message, please contact the sender
>>> without delay and delete the message from your system. Disclosure, copying,
>>> dissemination or publication of this message and information contained
>>> therein is prohibited.
>>>
>>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
>>> Poland;
>>> Registered under the KRS number 0000621513 to the National Court
>>> Register, kept by the District Court for Wrocław-Fabryczna VI Economic
>>> Department of the National Court Register, VAT-ID: PL8943079568, REGON
>>> 364634116; Share capital: PLN 5.000 fully paid-up.
>>>
>>
> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje
> poufne, które mogą być również objęte tajemnicą handlową lub służbową.
> Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
> skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu.
> Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości
> oraz zawartych w niej informacji jest zabronione.
>
> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego,
> prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział
> Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
> Kapitał zakładowy: 5.000 PLN w pełni opłacony.
>
> NOTE - Message and the documents attached thereto contain confidential
> information, which may also be a trade secret or confidential. If you are
> not the intended recipient of the message, please contact the sender
> without delay and delete the message from your system. Disclosure, copying,
> dissemination or publication of this message and information contained
> therein is prohibited.
>
> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
> Poland;
> Registered under the KRS number 0000621513 to the National Court Register,
> kept by the District Court for Wrocław-Fabryczna VI Economic Department of
> the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share
> capital: PLN 5.000 fully paid-up.
>

Reply via email to