I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK):
WARNING:root:severity: WARN
timestamp {
seconds: 1591370012
nanos: 523000000
}
message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
could not be established. Broker may not be available."
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "18"
Finally the pipeline fails with:
RuntimeError: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
See more complete log attached.
The relevant code snippet:
consumer_conf = {"bootstrap.servers": 'localhost:9092'}
...
kafka.ReadFromKafka(
consumer_config=consumer_conf,
topics=[args.topic],
)
...
Also see full python script attached.
I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
also not able to read from topic.
I am using kafka 2.5.0 and started the broker by following
https://kafka.apache.org/quickstart - using default
config/server.properties.
Everything runs locally, and I verified that I can publish&consume from
that topic using confluent_kafka library.
--
Best regards,
Piotr
beam.log
Description: Binary data
"""
Example:
# DirectRunner
python pipeline.py --bootstrap_servers=localhost:9092 --topic=inputs
# FlinkRunner
python batch.py --bootstrap_servers=localhost:9092 --topic=inputs \
--runner=FlinkRunner --flink_version=1.9 \
--flink_master=localhost:8081 --environment_type=LOOPBACK
"""
import argparse
import logging
import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def format_results(elem):
(msg, sum_value) = elem
return f"message: {msg}, sum: {sum_value}"
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument("--bootstrap_servers", type=str,
help="Kafka Broker address")
parser.add_argument("--topic", type=str, help="Kafka topic to read from")
parser.add_argument("--output", type=str, default="/tmp/kafka-output",
help="Output filepath")
args, pipeline_args = parser.parse_known_args(argv)
if args.topic is None or args.bootstrap_servers is None:
parser.print_usage()
print(f"{sys.argv[0]}: error: both --topic and --bootstrap_servers are required")
sys.exit(1)
options = PipelineOptions(pipeline_args)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = save_main_session
# Enforce that this pipeline is always run in streaming mode
options.view_as(StandardOptions).streaming = True
consumer_conf = {"bootstrap.servers": args.bootstrap_servers}
print(f"Starting pipeline "
f"kafka = {args.bootstrap_servers}, topic = {args.topic}")
with beam.Pipeline(options=options) as p:
events = (
p
| "ReadFromKafka" >> kafka.ReadFromKafka(
consumer_config=consumer_conf,
topics=[args.topic],
)
| "WindowIntoFixedWindows" >> beam.WindowInto(
beam.window.FixedWindows(60))
| "AddOnes" >> beam.Map(lambda msg: (msg, 1))
| "SumByKey" >> beam.CombinePerKey(sum)
| "FormatResults" >> beam.Map(format_results)
| "WriteUserScoreSums" >> beam.io.WriteToText(args.output)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
