Hi,

the InvalidClassException indicates that you are using different versions of the same class. Are you sure you are using the same Flink minor version (including the Scala suffix) for all dependencies and Kubernetes?

Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:
Hi,

when I try it locally it runs well. The problem is when I run it using Kubernetes. I don't know how to make Flink and Kubernetes go well together in that case.

Best, Wojtek

pt., 24 lip 2020 o 17:51 Xingbo Huang <hxbks...@gmail.com <mailto:hxbks...@gmail.com>> napisał(a):

    Hi Wojciech,
    In many cases, you can make sure that your code can run correctly in
    local mode, and then submit the job to the cluster for testing. For
    how to add jar packages in local mode, you can refer to the doc[1].
    Besides, you'd better use blink planner which is the default
    planner. For how to use blink planner, you can refer to the doc[2]

    [1]
    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
    [2]
    
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment

    Best,
    Xingbo

    Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai
    <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周五 下午
    9:40写道:

        Hi,

        I've done like you recommended:

        from pyflink.datastreamimport StreamExecutionEnvironment
        from pyflink.datasetimport ExecutionEnvironment
        from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, 
StreamTableEnvironment, ScalarFunction
        from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, 
Json, Csv
        from pyflink.table.udfimport udf

        exec_env = StreamExecutionEnvironment.get_execution_environment()
        t_config = TableConfig()
        t_env = StreamTableEnvironment.create(exec_env, t_config)

        INPUT_TABLE ="my_topic"
        INPUT_TOPIC ="my-topic"
        LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
        OUTPUT_TABLE ="my_topic_output"
        OUTPUT_TOPIC ="my-topic-output"

        ddl_source =f"""
        CREATE TABLE {INPUT_TABLE}(
        message STRING
        ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
        )
        """

        ddl_sink =f"""
        CREATE TABLE {OUTPUT_TABLE}(
        message STRING
        ) WITH (
        'connector' = 'kafka',
        'topic' = '{OUTPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
        )
        """

        t_env.execute_sql(ddl_source)
        t_env.execute_sql(ddl_sink)

        result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT message
        FROM {INPUT_TABLE}
        """)

        result.get_job_client().get_job_execution_result().result()

        I think it is correctly written.

        However after deploying that job I'm getting an error:

        wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
        WARNING: An illegal reflective access operation has occurred
        WARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
 to field java.util.Properties.serialVersionUID
        WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
        WARNING: Use --illegal-access=warn to enable warnings of further 
illegal reflective access operations
        WARNING: All illegal access operations will be denied in a future 
release
        Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
        Traceback (most recent call last):
           File "kafka2flink.py", line 62, in <module>
             result.get_job_client().get_job_execution_result().result()
           File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
 line 78, in result
           File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
           File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
           File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
        py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
        : java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
d23fe59415e9c9d79d15a1cf7e5409aa)
                at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
                at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
                at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:566)
                at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
                at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
                at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
                at java.base/java.lang.Thread.run(Thread.java:834)
        Caused by: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
                at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116)
                at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
                at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
                at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
                at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
                at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
                at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
                at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
                at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
                at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
                at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
                at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
                at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
                at 
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
                at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
                at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                ... 1 more
        Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
                at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
                at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114)
                ... 18 more
        Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy
                at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
                at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
                at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
                at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
                at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
                at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
                at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
                at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
                at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
                at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
                at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
                at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
                at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
                at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
                at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
                at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
                at akka.actor.ActorCell.invoke(ActorCell.scala:561)
                at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
                at akka.dispatch.Mailbox.run(Mailbox.scala:225)
                at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
                at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        Caused by: 
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
                at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
                at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
                at java.lang.Thread.run(Thread.java:748)
        *Caused by: java.io.InvalidClassException:
        org.apache.flink.table.types.logical.RowType$RowField; local
        class incompatible: stream classdesc serialVersionUID =
        3988094341871744603, local class serialVersionUID =
        -7902169369767750595
        = -7902169369767750595*
                at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
                at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)
                at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
                at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
                at java.util.ArrayList.readObject(ArrayList.java:799)
                at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:613)
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1290)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
                at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
                at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
                at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
                at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
                at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
                at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
                at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
                at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
                at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
                at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
                ... 8 more

        org.apache.flink.client.program.ProgramAbortException
                at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
                at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:566)
                at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
                at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
                at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
                at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
                at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
                at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
                at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

        I presume that something is incorrect between the
        FlinkversioncauseI want to deploy that job on Kubernetes. To
        create a clusteron K8S I used image *flink:1.11.0-scala_2.11. *Local 
version of Flink is *release-1.11*.

        What can cause that problem?

        Thanks,
        Wojtek

        pt., 24 lip 2020 o 11:32 Xingbo Huang <hxbks...@gmail.com
        <mailto:hxbks...@gmail.com>> napisał(a):

            Hi Wojtek,
            The following ways of using Pyflink is my personal
            recommendation:

            1. Use DDL[1] to create your source and sink instead of the
            descriptor way, because as of flink 1.11, there are some
            bugs in the descriptor way.

            2. Use `execute_sql` for single statement, use
            `create_statement_set` for multiple DML statements.[2]

            3. Use `execute_insert` for single sink, use
            `TableTableEnvironment#create_statement_set` for multiple sinks

            4. Use `from_path` method instead of `scan` method

            5. Call the method
            `get_job_client().get_job_execution_result().result()` of
            TableResult  which is the returned type of execute_insert or
            execute_sql after calling the method `excute_*`


            All PyFlink related common questions you can refer to the doc[3]

            [1]
            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
            [2]
            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html
            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html
            [3]
            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html

            Best,
            Xingbo

            Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai
            <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周
            五 下午4:44写道:

                Hi,
                thank you for your answer, it is very helpful.

                Currently my python program looks like:

                from pyflink.datastreamimport StreamExecutionEnvironment
                from pyflink.datasetimport ExecutionEnvironment
                from pyflink.tableimport TableConfig, DataTypes, 
BatchTableEnvironment, StreamTableEnvironment
                from pyflink.table.descriptorsimport Schema, OldCsv, 
FileSystem, Kafka, Json, Csv

                exec_env = 
StreamExecutionEnvironment.get_execution_environment()
                t_config = TableConfig()
                t_env = StreamTableEnvironment.create(exec_env, t_config)

                t_env.connect(Kafka()
                               .version("universal")
                               .topic("my-topic")
                               .property("bootstrap.servers", 
'my-cluster-kafka-bootstrap:9092')
                               ) \
                     .in_append_mode() \
                     .with_format(Csv()
                                  .line_delimiter("\r\n") \
                                  .derive_schema()) \
                     .with_schema(Schema()
                                  .field("value", DataTypes.STRING())) \
                     .create_temporary_table('mySource')

                t_env.connect(Kafka()
                               .version("universal")
                               .topic("my-topic-out")
                               .property("bootstrap.servers", 
'my-cluster-kafka-bootstrap:9092')
                               ) \
                     .with_format(Csv()
                              .line_delimiter("\r\n") \
                              .derive_schema()) \
                     .with_schema(Schema()
                                  .field("value", DataTypes.STRING())) \
                     .in_append_mode() \
                     .create_temporary_table('mySink')


                t_env.scan('mySource') \
                     .select('"flink_job_" + value') \
                     .insert_into('mySink')

                t_env.execute("tutorial_job")

                I have installed PyFlink 1.11 so the IDE is pointing me
                out the commandsconnect, scan, insert_into, *execute
                *are deprectade. What is the correct way the program
                should be different following 1.11 version of PyFlink?

                Kind regards,
                Wojtek


                pt., 24 lip 2020 o 04:21 Xingbo Huang
                <hxbks...@gmail.com <mailto:hxbks...@gmail.com>> napisał(a):

                    Hi Wojtek,
                    In flink 1.11, the methods register_table_source and
                    register_table_sink of ConnectTableDescriptor have
                    been removed. You need to use createTemporaryTable
                    instead of these two methods.Besides, it seems that
                    the version of your pyflink is 1.10, but the
                    corresponding flink is 1.11.

                    Best,
                    Xingbo

                    Wojciech Korczyński
                    <wojciech.korczyn...@alphamoon.ai
                    <mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7
                    月23日周四 下午9:01写道:

                        Thank you for your answer.

                        I have replaced that .jar with Kafka version
                        universal - the links to other versions are extinct.

                        After the attempt of deploying:
                        bin/flink run -py
                        
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
                        --jarfile
                        
/home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar

                        there another error occurs:
                        Traceback (most recent call last):
                           File
                        
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
                        line 20, in <module>
                             .field("tbd", DataTypes.INT())) \
                        AttributeError: 'StreamTableDescriptor' object
                        has no attribute 'register_table_source'
                        org.apache.flink.client.program.ProgramAbortException
                        at
                        
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
                        at
                        
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                        Method)
                        at
                        
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                        at
                        
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                        at
                        
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                        at
                        
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                        at
                        
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                        at
                        
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
                        at
                        
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
                        at
                        
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
                        at
                        
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
                        at
                        
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
                        at
                        
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
                        at
                        
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

                        Maybe the way the python program is written is
                        incorrect. Can it be deprecated taking into
                        account that the installed flink version is 1.11?

                        Best regards,
                        Wojtek

                        czw., 23 lip 2020 o 12:01 Xingbo Huang
                        <hxbks...@gmail.com <mailto:hxbks...@gmail.com>>
                        napisał(a):

                            Hi Wojtek,
                            you need to use the fat jar
                            'flink-sql-connector-kafka_2.11-1.11.0.jar'
                            which you can download in the doc[1]

                            [1]
                            
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

                            Best,
                            Xingbo

                            Wojciech Korczyński
                            <wojciech.korczyn...@alphamoon.ai
                            <mailto:wojciech.korczyn...@alphamoon.ai>>
                            于2020年7月23日周四 下午4:57写道:

                                Hello,

                                I am trying to deploy a Python job with
                                Kafka connector:

                                from pyflink.datastream import
                                StreamExecutionEnvironment
                                from pyflink.dataset import
                                ExecutionEnvironment
                                from pyflink.table import TableConfig,
                                DataTypes, BatchTableEnvironment,
                                StreamTableEnvironment
                                from pyflink.table.descriptors import
                                Schema, OldCsv, FileSystem, Kafka, Json, Csv

                                exec_env =
                                
StreamExecutionEnvironment.get_execution_environment()
                                t_config = TableConfig()
                                t_env =
                                StreamTableEnvironment.create(exec_env,
                                t_config)

                                t_env.connect(Kafka()
                                               .version("0.11")
                                               .topic("my-topic")
.property("bootstrap.servers",
                                'my-cluster-kafka-bootstrap:9092')
                                               ) \
                                     .in_append_mode() \
                                     .with_format(Csv()
 .line_delimiter("\r\n")      \
                                                  .derive_schema()) \
                                     .with_schema(Schema()
                                                  .field("tbd",
                                DataTypes.INT())) \
                                     .register_table_source('mySource')

                                
t_env.connect(FileSystem().path('../production_data/kafkaoutput'))
                                \
                                     .with_format(OldCsv()
                                                  .field('tbd',
                                DataTypes.INT())) \
                                     .with_schema(Schema()
                                                  .field("tbd",
                                DataTypes.INT())) \
                                     .register_table_sink('mySink')

                                t_env.scan('mySource') \
                                     .select('tbd') \
                                     .where("tbd = 1") \
                                     .insert_into('mySink')

                                t_env.execute("tutorial_job")

                                When I run a deploying command:
                                bin/flink run -py
                                
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
                                --jarfile
                                
/home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar


                                I get an error:
                                Traceback (most recent call last):
                                   File
                                
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
                                line 9, in <module>
                                     t_env =
                                StreamTableEnvironment.create(exec_env,
                                t_config)
                                   File
                                
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
                                line 1478, in create
                                   File
                                
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
                                line 1286, in __call__
                                   File
                                
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
                                line 147, in deco
                                   File
                                
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
                                line 328, in get_return_value
                                py4j.protocol.Py4JJavaError: An error
                                occurred while calling
                                
z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
                                : java.lang.NoClassDefFoundError:
                                
org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
                                at
                                
java.base/java.lang.ClassLoader.defineClass1(Native
                                Method)
                                at
                                
java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
                                at
                                
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
                                at
                                
java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
                                at
                                
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
                                at
                                
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
                                at
                                
java.base/java.security.AccessController.doPrivileged(Native
                                Method)
                                at
                                
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
                                at
                                
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                                at
                                
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                                at
                                
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
                                at
                                
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                                at
                                
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                                at
                                java.base/java.lang.Class.forName0(Native 
Method)
                                at
                                
java.base/java.lang.Class.forName(Class.java:398)
                                at
                                
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
                                at
                                
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
                                at
                                
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
                                at
                                
java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
                                at
                                
java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
                                at
                                
java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
                                at
                                
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
                                at
                                
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
                                at
                                
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
                                at
                                
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
                                at
                                
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
                                at
                                
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
                                at
                                
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
                                at
                                
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                                Method)
                                at
                                
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                                at
                                
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                                at
                                
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                                at
                                
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                                at
                                
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                                at
                                
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
                                at
                                
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                                at
                                
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
                                at
                                
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
                                at
                                java.base/java.lang.Thread.run(Thread.java:834)
                                Caused by:
                                java.lang.ClassNotFoundException:
                                
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
                                at
                                
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
                                at
                                
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                                at
                                
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                                at
                                
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
                                at
                                
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                                at
                                
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                                ... 39 more

                                
org.apache.flink.client.program.ProgramAbortException
                                at
                                
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
                                at
                                
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                                Method)
                                at
                                
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                                at
                                
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                                at
                                
java.base/java.lang.reflect.Method.invoke(Method.java:566)
                                at
                                
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                                at
                                
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                                at
                                
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
                                at
                                
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
                                at
                                
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
                                at
                                
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
                                at
                                
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
                                at
                                
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
                                at
                                
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

                                What is the correct way to deploy python
                                job on Flink which uses Kafka? It seems
                                like it cannot get a correct dependency
                                of Kafka.

                                I wonder if there is some more simply
                                solution and if it matters that i would
                                like deploy a job on the K8s cluster.

                                Thanks,
                                Wojtek


                        UWAGA - Wiadomość oraz załączone do niej
                        dokumenty zawierają informacje poufne, które
                        mogą być również objęte tajemnicą handlową lub
                        służbową. Jeśli nie jesteś zamierzonym odbiorcą
                        wiadomości, proszę bezzwłocznie skontaktuj się z
                        nadawcą oraz usuń wiadomość ze swojego systemu.
                        Ujawnianie, kopiowanie, rozpowszechnianie czy
                        publikacja tej wiadomości oraz zawartych w niej
                        informacji jest zabronione.

                        Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3,
                        50-072 Wrocław,
                        wpisana pod numerem KRS 0000621513 do Krajowego
                        Rejestru Sądowego, prowadzonego przez Sąd
                        Rejonowy dla Wrocławia-Fabrycznej VI Wydział
                        Gospodarczy Krajowego Rejestru Sądowego, NIP:
                        8943079568, REGON 364634116.; Kapitał zakładowy:
                        5.000 PLN w pełni opłacony.

                        NOTE - Message and the documents attached
                        thereto contain confidential information, which
                        may also be a trade secret or confidential. If
                        you are not the intended recipient of the
                        message, please contact the sender without delay
                        and delete the message from your system.
                        Disclosure, copying, dissemination or
                        publication of this message and information
                        contained therein is prohibited.

                        Alphamoon Sp. z o.o. (Ltd.), ul. Pawła
                        Włodkowica 21/3, 50-072 Wrocław, Poland;
                        Registered under the KRS number 0000621513 to
                        the National Court Register, kept by the
                        District Court for Wrocław-Fabryczna VI Economic
                        Department of the National Court Register,
                        VAT-ID: PL8943079568, REGON 364634116; Share
                        capital: PLN 5.000 fully paid-up.


                UWAGA - Wiadomość oraz załączone do niej dokumenty
                zawierają informacje poufne, które mogą być również
                objęte tajemnicą handlową lub służbową. Jeśli nie jesteś
                zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
                skontaktuj się z nadawcą oraz usuń wiadomość ze swojego
                systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy
                publikacja tej wiadomości oraz zawartych w niej
                informacji jest zabronione.

                Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072
                Wrocław,
                wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
                Sądowego, prowadzonego przez Sąd Rejonowy dla
                Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego
                Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
                Kapitał zakładowy: 5.000 PLN w pełni opłacony.

                NOTE - Message and the documents attached thereto
                contain confidential information, which may also be a
                trade secret or confidential. If you are not the
                intended recipient of the message, please contact the
                sender without delay and delete the message from your
                system. Disclosure, copying, dissemination or
                publication of this message and information contained
                therein is prohibited.

                Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3,
                50-072 Wrocław, Poland;
                Registered under the KRS number 0000621513 to the
                National Court Register, kept by the District Court for
                Wrocław-Fabryczna VI Economic Department of the National
                Court Register, VAT-ID: PL8943079568, REGON 364634116;
                Share capital: PLN 5.000 fully paid-up.


        UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają
        informacje poufne, które mogą być również objęte tajemnicą
        handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą
        wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz
        usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie,
        rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w
        niej informacji jest zabronione.

        Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
        wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
        Sądowego, prowadzonego przez Sąd Rejonowy dla
        Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru
        Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy:
        5.000 PLN w pełni opłacony.

        NOTE - Message and the documents attached thereto contain
        confidential information, which may also be a trade secret or
        confidential. If you are not the intended recipient of the
        message, please contact the sender without delay and delete the
        message from your system. Disclosure, copying, dissemination or
        publication of this message and information contained therein is
        prohibited.

        Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072
        Wrocław, Poland;
        Registered under the KRS number 0000621513 to the National Court
        Register, kept by the District Court for Wrocław-Fabryczna VI
        Economic Department of the National Court Register, VAT-ID:
        PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully
        paid-up.


UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione.

Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony.

NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited.

Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.

Reply via email to