Hi,

Don't worry – this is quite a low priority question. Definitely not a
production issue and as a work around it can be fixed rather easily with
suitable network setup. Probably quite rare, too, that this kind of network
scenario happens with anyone.

But I think that it might be possible to improve how FlinkKafkaConsumer
handles the host names. I'll leave it for you to consider :)

You wrote:
"Kafka uses the bootstrap servers to connect to the brokers to consume
messages."
- but it seems that it's actually using the kafka broker hosts that
zookeeper tells to it?

Let me clarify first the particular network setup where this becomes a
problem:

- zookeeper & kafka hosts are within the same internal network
- client with FlinkKafkaConsumer is outside that network
- zookeeper & kafka hosts available to the external client via public ip:s.
for example one of the kafka nodes: 10.0.3.85
- zookeeper accesses kafka using host names that are only configured within
that network, not the public ip:s. for example kafka8v-internal-10-0-3-85.

Now, the FlinkKafkaConsumer connects to
- kafka public ip:s as configured (works)
- zookeeper public ip:s as configured (works)
- kafka internal host names as given by zookeeper (fails, because the host
name doesn't resolve)

You can find some log output and a stacktrace at the end of this message.

Obviously this can be fixed by network configuration in two ways:
a) add the internal host names to /etc/hosts of the client node and map the
internal kafka host names to their public ip:s
b) change zookeeper configuration to access kafka via public ip:s instead
(FlinkKafkaConsumer gets those from zookeeper)

To me it seems that the FlinkKafkaConsumer could still use the ip:s (or host
names) that are given as arguments, and not the broker host names retrieved
from zookeeper. But I may be wrong to say that, if the concept of kafka
bootstrap server and broker are entirely different.

I'm just trying to understand how this works and if it would even make sense
for the FlinkKafkaConsumer to use the provided bootstrap servers for
consuming. If yes, I would say my original conclusion holds, ie. bootstrap
server hosts wouldn't be needed as an argument, or then FlinkKafkaConsumer
wouldn't need to ask zookeeper for kafka hosts.

Cheers,
Juho


The log output & stacktrace:

10:24:43.306 [main] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to
get topic metadata from broker 10.0.3.85:9092 in try 0/3
10:24:48.270 [Custom Source (2/4)] INFO  org.apache.zookeeper.ZooKeeper -
Initiating client connection, connectString=10.0.3.38,10.0.3.48,10.0.3.69
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@a05936f
10:24:48.270 [Custom Source (1/4)] INFO  org.apache.zookeeper.ZooKeeper -
Initiating client connection, connectString=10.0.3.38,10.0.3.48,10.0.3.69
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@58f47fb0
10:24:48.302 [Custom Source (10.0.3.48:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Opening socket connection to server
10.0.3.48/10.0.3.48:2181. Will not attempt to authenticate using SASL
(unknown error)
10:24:48.303 [Custom Source (10.0.3.38:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Opening socket connection to server
10.0.3.38/10.0.3.38:2181. Will not attempt to authenticate using SASL
(unknown error)
10:24:48.444 [Custom Source (10.0.3.48:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Socket connection established to
10.0.3.48/10.0.3.48:2181, initiating session
10:24:48.445 [Custom Source (10.0.3.38:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Socket connection established to
10.0.3.38/10.0.3.38:2181, initiating session
10:24:48.595 [Custom Source (10.0.3.48:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Session establishment complete on server
10.0.3.48/10.0.3.48:2181, sessionid = 0x24f8e60da6141d9, negotiated timeout
= 6000
10:24:48.595 [Custom Source (10.0.3.38:2181)] INFO 
org.apache.zookeeper.ClientCnxn - Session establishment complete on server
10.0.3.38/10.0.3.38:2181, sessionid = 0x15004816d432073, negotiated timeout
= 6000
10:24:48.763 [Custom Source (1/4)] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Reading
from partitions {test.topic-0=-915623761776} using the legacy fetcher
10:24:48.764 [Custom Source (2/4)] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Reading
from partitions {test.topic-1=-915623761776} using the legacy fetcher
10:24:48.764 [Thread-16] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to
get topic metadata from broker 10.0.3.93:9092 in try 0/3
10:24:48.764 [Thread-17] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to
get topic metadata from broker 10.0.3.85:9092 in try 0/3
10:24:49.052 [Custom Source (2/4)] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Starting
thread SimpleConsumer - StreamSource - broker-85
(kafka8v-internal-10-0-3-85:9092)
10:24:49.052 [Custom Source (1/4)] INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Starting
thread SimpleConsumer - StreamSource - broker-93
(kafka8v-internal-10-0-3-93:9092)
10:24:49.150 [Custom Source (1/4)] ERROR
org.apache.flink.streaming.runtime.tasks.SourceStreamTask - Custom Source
(1/4) failed
java.lang.Exception: null
        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:241)
~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
~[flink-streaming-core-0.9.1.jar:0.9.1]
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
~[flink-streaming-core-0.9.1.jar:0.9.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
[flink-runtime-0.9.1.jar:0.9.1]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.nio.channels.ClosedChannelException: null
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
~[kafka_2.10-0.8.2.1.jar:na]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
~[kafka_2.10-0.8.2.1.jar:na]
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
~[kafka_2.10-0.8.2.1.jar:na]
        at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
~[kafka_2.10-0.8.2.1.jar:na]
        at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
~[kafka_2.10-0.8.2.1.jar:na]
        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:498)
~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:375)
~[flink-connector-kafka-0.9.1.jar:0.9.1]



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-bootstrap-servers-vs-broker-hosts-tp3109p3132.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to