Hi Kostas, As far as I know you cannot just use java classes from within python API. I think Python API does not provide wrapper for kafka connector. I am adding Chesnay to cc to correct me if I am wrong.
Best, Dawid On 11/10/18 12:18, Kostas Evangelou wrote: > Hey all, > > Thank you so much for your efforts. I've already posted this question > on stack overflow, but thought I should ask here as well. > > I am trying out Flink's new Python streaming API and attempting to run > my script with |./flink-1.6.1/bin/pyflink-stream.sh > examples/read_from_kafka.py|. The python script is fairly > straightforward, I am just trying to consume from an existing topic > and send everything to stdout (or the *.out file in the log directory > where the output method emits data by default). > > import glob > > import os > > import sys > > from java.util import Properties > > from org.apache.flink.streaming.api.functions.source import SourceFunction > > from org.apache.flink.streaming.api.collector.selector import > OutputSelector > > from org.apache.flink.api.common.serialization import SimpleStringSchema > > > directories=['/home/user/flink/flink-1.6.1/lib'] > > for directory in directories: > > for jar in glob.glob(os.path.join(directory,'*.jar')): > > sys.path.append(jar) > > > from org.apache.flink.streaming.connectors.kafka import > FlinkKafkaConsumer09 > > > props = Properties() > > config = {"bootstrap_servers": "localhost:9092", > > "group_id": "flink_test", > > "topics": ["TopicCategory-TopicName"]} > > props.setProperty("bootstrap.servers", config['bootstrap_servers']) > > props.setProperty("group_id", config['group_id']) > > props.setProperty("zookeeper.connect", "localhost:2181") > > > def main(factory): > > consumer = FlinkKafkaConsumer09([config["topics"]], > SimpleStringSchema(), props) > > > env = factory.get_execution_environment() > > env.add_java_source(consumer) \ > > .output() > > env.execute() > > > I grabbed a handful of jar files from the maven repos, > namely |flink-connector-kafka-0.9_2.11-1.6.1.jar|, > |flink-connector-kafka-base_2.11-1.6.1.jar| and |kafka-clients-0.9.0.1.jar|and > copied them in Flink's |lib| directory. Unless I misunderstood the > documentation, this should suffice for Flink to load the kafka > connector. Indeed, if I remove any of these jars the import fails, but > this doesn't seem to be enough to actually invoke the plan. Adding a > for loop to dynamically add these to |sys.path| didn't work either. > Here's what gets printed in the console: > > Starting execution of program > > Failed to run plan: null > > Traceback (most recent call last): > > File "<string>", line 1, in <module> > > File > "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", > line 32, in main > > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267) > > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) > > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) > > at > org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > > org.apache.flink.client.program.ProgramInvocationException: > org.apache.flink.client.program.ProgramInvocationException: Job > failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba) > > > The program didn't contain a Flink job. Perhaps you forgot to call > execute() on the execution environment. > > > This is what I see in the logs: > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > load user class: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > > ClassLoader info: URL ClassLoader: > > file: > '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' > (valid JAR) > > Class not resolvable through given classloader. > > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > > Is there a way to fix this and make the connector available to Python? > > Many thanks, > Kostas >
signature.asc
Description: OpenPGP digital signature