Hi,
when I try it locally it runs well. The problem is when I run it
using Kubernetes. I don't know how to make Flink and Kubernetes go well
together in that case.
Best, Wojtek
pt., 24 lip 2020 o 17:51 Xingbo Huang <hxbks...@gmail.com
<mailto:hxbks...@gmail.com>> napisał(a):
Hi Wojciech,
In many cases, you can make sure that your code can run correctly in
local mode, and then submit the job to the cluster for testing. For
how to add jar packages in local mode, you can refer to the doc[1].
Besides, you'd better use blink planner which is the default
planner. For how to use blink planner, you can refer to the doc[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
Best,
Xingbo
Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai
<mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周五 下午
9:40写道:
Hi,
I've done like you recommended:
from pyflink.datastreamimport StreamExecutionEnvironment
from pyflink.datasetimport ExecutionEnvironment
from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment,
StreamTableEnvironment, ScalarFunction
from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka,
Json, Csv
from pyflink.table.udfimport udf
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
INPUT_TABLE ="my_topic"
INPUT_TOPIC ="my-topic"
LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
OUTPUT_TABLE ="my_topic_output"
OUTPUT_TOPIC ="my-topic-output"
ddl_source =f"""
CREATE TABLE {INPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
ddl_sink =f"""
CREATE TABLE {OUTPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)
result = t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT message
FROM {INPUT_TABLE}
""")
result.get_job_client().get_job_execution_result().result()
I think it is correctly written.
However after deploying that job I'm getting an error:
wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py
kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future
release
Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
Traceback (most recent call last):
File "kafka2flink.py", line 62, in <module>
result.get_job_client().get_job_execution_result().result()
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
line 78, in result
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID:
d23fe59415e9c9d79d15a1cf7e5409aa)
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114)
... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
*Caused by: java.io.InvalidClassException:
org.apache.flink.table.types.logical.RowType$RowField; local
class incompatible: stream classdesc serialVersionUID =
3988094341871744603, local class serialVersionUID =
-7902169369767750595
= -7902169369767750595*
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at java.util.ArrayList.readObject(ArrayList.java:799)
at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:613)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
... 8 more
org.apache.flink.client.program.ProgramAbortException
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
I presume that something is incorrect between the
FlinkversioncauseI want to deploy that job on Kubernetes. To
create a clusteron K8S I used image *flink:1.11.0-scala_2.11. *Local
version of Flink is *release-1.11*.
What can cause that problem?
Thanks,
Wojtek
pt., 24 lip 2020 o 11:32 Xingbo Huang <hxbks...@gmail.com
<mailto:hxbks...@gmail.com>> napisał(a):
Hi Wojtek,
The following ways of using Pyflink is my personal
recommendation:
1. Use DDL[1] to create your source and sink instead of the
descriptor way, because as of flink 1.11, there are some
bugs in the descriptor way.
2. Use `execute_sql` for single statement, use
`create_statement_set` for multiple DML statements.[2]
3. Use `execute_insert` for single sink, use
`TableTableEnvironment#create_statement_set` for multiple sinks
4. Use `from_path` method instead of `scan` method
5. Call the method
`get_job_client().get_job_execution_result().result()` of
TableResult which is the returned type of execute_insert or
execute_sql after calling the method `excute_*`
All PyFlink related common questions you can refer to the doc[3]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html
Best,
Xingbo
Wojciech Korczyński <wojciech.korczyn...@alphamoon.ai
<mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周
五 下午4:44写道:
Hi,
thank you for your answer, it is very helpful.
Currently my python program looks like:
from pyflink.datastreamimport StreamExecutionEnvironment
from pyflink.datasetimport ExecutionEnvironment
from pyflink.tableimport TableConfig, DataTypes,
BatchTableEnvironment, StreamTableEnvironment
from pyflink.table.descriptorsimport Schema, OldCsv,
FileSystem, Kafka, Json, Csv
exec_env =
StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
t_env.connect(Kafka()
.version("universal")
.topic("my-topic")
.property("bootstrap.servers",
'my-cluster-kafka-bootstrap:9092')
) \
.in_append_mode() \
.with_format(Csv()
.line_delimiter("\r\n") \
.derive_schema()) \
.with_schema(Schema()
.field("value", DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(Kafka()
.version("universal")
.topic("my-topic-out")
.property("bootstrap.servers",
'my-cluster-kafka-bootstrap:9092')
) \
.with_format(Csv()
.line_delimiter("\r\n") \
.derive_schema()) \
.with_schema(Schema()
.field("value", DataTypes.STRING())) \
.in_append_mode() \
.create_temporary_table('mySink')
t_env.scan('mySource') \
.select('"flink_job_" + value') \
.insert_into('mySink')
t_env.execute("tutorial_job")
I have installed PyFlink 1.11 so the IDE is pointing me
out the commandsconnect, scan, insert_into, *execute
*are deprectade. What is the correct way the program
should be different following 1.11 version of PyFlink?
Kind regards,
Wojtek
pt., 24 lip 2020 o 04:21 Xingbo Huang
<hxbks...@gmail.com <mailto:hxbks...@gmail.com>> napisał(a):
Hi Wojtek,
In flink 1.11, the methods register_table_source and
register_table_sink of ConnectTableDescriptor have
been removed. You need to use createTemporaryTable
instead of these two methods.Besides, it seems that
the version of your pyflink is 1.10, but the
corresponding flink is 1.11.
Best,
Xingbo
Wojciech Korczyński
<wojciech.korczyn...@alphamoon.ai
<mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7
月23日周四 下午9:01写道:
Thank you for your answer.
I have replaced that .jar with Kafka version
universal - the links to other versions are extinct.
After the attempt of deploying:
bin/flink run -py
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
--jarfile
/home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar
there another error occurs:
Traceback (most recent call last):
File
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
line 20, in <module>
.field("tbd", DataTypes.INT())) \
AttributeError: 'StreamTableDescriptor' object
has no attribute 'register_table_source'
org.apache.flink.client.program.ProgramAbortException
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Maybe the way the python program is written is
incorrect. Can it be deprecated taking into
account that the installed flink version is 1.11?
Best regards,
Wojtek
czw., 23 lip 2020 o 12:01 Xingbo Huang
<hxbks...@gmail.com <mailto:hxbks...@gmail.com>>
napisał(a):
Hi Wojtek,
you need to use the fat jar
'flink-sql-connector-kafka_2.11-1.11.0.jar'
which you can download in the doc[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
Best,
Xingbo
Wojciech Korczyński
<wojciech.korczyn...@alphamoon.ai
<mailto:wojciech.korczyn...@alphamoon.ai>>
于2020年7月23日周四 下午4:57写道:
Hello,
I am trying to deploy a Python job with
Kafka connector:
from pyflink.datastream import
StreamExecutionEnvironment
from pyflink.dataset import
ExecutionEnvironment
from pyflink.table import TableConfig,
DataTypes, BatchTableEnvironment,
StreamTableEnvironment
from pyflink.table.descriptors import
Schema, OldCsv, FileSystem, Kafka, Json, Csv
exec_env =
StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env =
StreamTableEnvironment.create(exec_env,
t_config)
t_env.connect(Kafka()
.version("0.11")
.topic("my-topic")
.property("bootstrap.servers",
'my-cluster-kafka-bootstrap:9092')
) \
.in_append_mode() \
.with_format(Csv()
.line_delimiter("\r\n") \
.derive_schema()) \
.with_schema(Schema()
.field("tbd",
DataTypes.INT())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('../production_data/kafkaoutput'))
\
.with_format(OldCsv()
.field('tbd',
DataTypes.INT())) \
.with_schema(Schema()
.field("tbd",
DataTypes.INT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.select('tbd') \
.where("tbd = 1") \
.insert_into('mySink')
t_env.execute("tutorial_job")
When I run a deploying command:
bin/flink run -py
/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py
--jarfile
/home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar
I get an error:
Traceback (most recent call last):
File
"/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py",
line 9, in <module>
t_env =
StreamTableEnvironment.create(exec_env,
t_config)
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1478, in create
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
File
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error
occurred while calling
z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create.
: java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase
at
java.base/java.lang.ClassLoader.defineClass1(Native
Method)
at
java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
at
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at
java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550)
at
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at
java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at
java.base/java.security.AccessController.doPrivileged(Native
Method)
at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at
java.base/java.lang.Class.forName0(Native
Method)
at
java.base/java.lang.Class.forName(Class.java:398)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220)
at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264)
at
java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299)
at
java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384)
at
java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
at
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
at
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
at
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
at
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135)
at
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at
java.base/java.lang.Thread.run(Thread.java:834)
Caused by:
java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 39 more
org.apache.flink.client.program.ProgramAbortException
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
What is the correct way to deploy python
job on Flink which uses Kafka? It seems
like it cannot get a correct dependency
of Kafka.
I wonder if there is some more simply
solution and if it matters that i would
like deploy a job on the K8s cluster.
Thanks,
Wojtek
UWAGA - Wiadomość oraz załączone do niej
dokumenty zawierają informacje poufne, które
mogą być również objęte tajemnicą handlową lub
służbową. Jeśli nie jesteś zamierzonym odbiorcą
wiadomości, proszę bezzwłocznie skontaktuj się z
nadawcą oraz usuń wiadomość ze swojego systemu.
Ujawnianie, kopiowanie, rozpowszechnianie czy
publikacja tej wiadomości oraz zawartych w niej
informacji jest zabronione.
Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3,
50-072 Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego
Rejestru Sądowego, prowadzonego przez Sąd
Rejonowy dla Wrocławia-Fabrycznej VI Wydział
Gospodarczy Krajowego Rejestru Sądowego, NIP:
8943079568, REGON 364634116.; Kapitał zakładowy:
5.000 PLN w pełni opłacony.
NOTE - Message and the documents attached
thereto contain confidential information, which
may also be a trade secret or confidential. If
you are not the intended recipient of the
message, please contact the sender without delay
and delete the message from your system.
Disclosure, copying, dissemination or
publication of this message and information
contained therein is prohibited.
Alphamoon Sp. z o.o. (Ltd.), ul. Pawła
Włodkowica 21/3, 50-072 Wrocław, Poland;
Registered under the KRS number 0000621513 to
the National Court Register, kept by the
District Court for Wrocław-Fabryczna VI Economic
Department of the National Court Register,
VAT-ID: PL8943079568, REGON 364634116; Share
capital: PLN 5.000 fully paid-up.
UWAGA - Wiadomość oraz załączone do niej dokumenty
zawierają informacje poufne, które mogą być również
objęte tajemnicą handlową lub służbową. Jeśli nie jesteś
zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
skontaktuj się z nadawcą oraz usuń wiadomość ze swojego
systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy
publikacja tej wiadomości oraz zawartych w niej
informacji jest zabronione.
Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072
Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
Sądowego, prowadzonego przez Sąd Rejonowy dla
Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego
Rejestru Sądowego, NIP: 8943079568, REGON 364634116.;
Kapitał zakładowy: 5.000 PLN w pełni opłacony.
NOTE - Message and the documents attached thereto
contain confidential information, which may also be a
trade secret or confidential. If you are not the
intended recipient of the message, please contact the
sender without delay and delete the message from your
system. Disclosure, copying, dissemination or
publication of this message and information contained
therein is prohibited.
Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3,
50-072 Wrocław, Poland;
Registered under the KRS number 0000621513 to the
National Court Register, kept by the District Court for
Wrocław-Fabryczna VI Economic Department of the National
Court Register, VAT-ID: PL8943079568, REGON 364634116;
Share capital: PLN 5.000 fully paid-up.
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają
informacje poufne, które mogą być również objęte tajemnicą
handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą
wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz
usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie,
rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w
niej informacji jest zabronione.
Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego Rejestru
Sądowego, prowadzonego przez Sąd Rejonowy dla
Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru
Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy:
5.000 PLN w pełni opłacony.
NOTE - Message and the documents attached thereto contain
confidential information, which may also be a trade secret or
confidential. If you are not the intended recipient of the
message, please contact the sender without delay and delete the
message from your system. Disclosure, copying, dissemination or
publication of this message and information contained therein is
prohibited.
Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072
Wrocław, Poland;
Registered under the KRS number 0000621513 to the National Court
Register, kept by the District Court for Wrocław-Fabryczna VI
Economic Department of the National Court Register, VAT-ID:
PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully
paid-up.
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje
poufne, które mogą być również objęte tajemnicą handlową lub służbową.
Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie
skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu.
Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości
oraz zawartych w niej informacji jest zabronione.
Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego,
prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział
Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON
364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony.
NOTE - Message and the documents attached thereto contain confidential
information, which may also be a trade secret or confidential. If you
are not the intended recipient of the message, please contact the sender
without delay and delete the message from your system. Disclosure,
copying, dissemination or publication of this message and information
contained therein is prohibited.
Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław,
Poland;
Registered under the KRS number 0000621513 to the National Court
Register, kept by the District Court for Wrocław-Fabryczna VI Economic
Department of the National Court Register, VAT-ID: PL8943079568, REGON
364634116; Share capital: PLN 5.000 fully paid-up.