Is Kafka itself running inside another container? If so inspect that container 
and see if it has a network alias and add that alias to your /etc/hosts file 
and map it to 127.0.0.1. 
 

From: Chamikara Jayalath <chamik...@google.com> 
Sent: Friday, June 5, 2020 2:58 PM
To: Luke Cwik <lc...@google.com>
Cc: user <user@beam.apache.org>; dev <d...@beam.apache.org>; Heejong Lee 
<heej...@google.com>
Subject: Re: Python SDK ReadFromKafka: Timeout expired while fetching topic 
metadata

 

Is it possible that "'localhost:9092'" is not available from the Docker 
environment where the Flink step is executed from ? Can you try specifying the 
actual IP address of the node running the Kafka broker ?

 

On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com 
<mailto:lc...@google.com> > wrote:

+dev <mailto:d...@beam.apache.org>  +Chamikara Jayalath 
<mailto:chamik...@google.com>  +Heejong Lee <mailto:heej...@google.com>  

 

On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <piotr.filip...@gmail.com 
<mailto:piotr.filip...@gmail.com> > wrote:

I am unable to read from Kafka and getting the following warnings & errors when 
calling kafka.ReadFromKafka() (Python SDK):

 

WARNING:root:severity: WARN
timestamp {
  seconds: 1591370012
  nanos: 523000000
}
message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could 
not be established. Broker may not be available."
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "18"

 

Finally the pipeline fails with:

 

RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: 
Timeout expired while fetching topic metadata

 

See more complete log attached.

 

The relevant code snippet:

 

consumer_conf = {"bootstrap.servers": 'localhost:9092'}

...

kafka.ReadFromKafka(
                consumer_config=consumer_conf,
                topics=[args.topic],

)

...

 

Also see full python script attached.

 

I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also 
not able to read from topic.

 

I am using kafka 2.5.0 and started the broker by following 
https://kafka.apache.org/quickstart - using default config/server.properties.


 

Everything runs locally, and I verified that I can publish&consume from that 
topic using confluent_kafka library.

 

-- 

Best regards,
Piotr

Reply via email to