Hi al,

I'm working on a DC-DC setup with two Kafka 0.8Beta1 setups and want to mirror 
the messages between two DC's (DC A and DC B) using MirrorMaker.
The idea should not be very difficult though I encounter some, for me, 
unexplainable results and inconsistencies in the available documentation (some 
is only 0.7.x based).

According to 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29
 I should point the mirrormaker to the source cluster's ZK (DC A 
localhost:2181) and the producer to the remote (DC B) zookeeper, unless I use 
the meta.broker.list parameter, which I do. The meta.broker.list parameter 
points to the remote IP address and the three brokers running there. I cannot 
find an option on how I can point MirrorMaker to the remote zookeeper and 
besides that, the meta.broker.list parameter has shown to be mandatory for the 
producer.properties, so seems I am stuck with meta.broker.list.

The moment I start MirrorMaker it seems in the end to only use the broker-list 
values of the local Zookeeper instance (the localhost:929[234] brokers) in 
stead of the values I put in the mirror-producer.properties file (shown below). 
I also do not see any connections ending up on the remote end. (remote-kafka is 
listed in /etc/hosts with it's public ip).

So my main questions basically are;

- What is the best way to define the location of the remote brokers, through 
remote ZK or through the meta.broker.list
- Are there any other 0.8-Beta1 things i need to be aware of, or is there any 
0.8 accurate documentation ? :)

 
Thanks for any pointers, while I might well be overlooking things.

Roel.


My mirror-producer.properties file :

############################# Producer Basics #############################

# list of brokers used for bootstrapping
# format: host1:port1,host2:port2 ...
metadata.broker.list=remote-kafka:9092,remote-kafka:9093,remote-kafka:9094

# name of the partitioner class for partitioning events; default partition 
spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
producer.type=async

# specify the compression codec for all data generated: none , gzip, snappy.
# the old config values work as well: 0, 1, 2 for none, gzip, snappy, 
respectivally
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=

############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
queue.buffering.max.ms=11000000

# the maximum size of the blocking queue for buffering on the producer
queue.buffering.max.messages=2000000

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
queue.enqueue.timeout.ms=-1

# the number of messages batched at the producer
#batch.num.messages=


The log mentions:
[playertrackingmobile is the DC A host, remote-kafka is the DC B host]


[2013-07-30 13:40:57,437] INFO Starting mirror maker (kafka.tools.MirrorMaker$)
[2013-07-30 13:40:57,596] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,649] INFO Property compression.codec is overridden to none 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,650] INFO Property metadata.broker.list is overridden to 
remote-kafka:9092,remote-kafka:9093,remote-kafka:9094 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,650] INFO Property producer.type is overridden to async 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,650] INFO Property queue.buffering.max.messages is 
overridden to 2000000 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,650] INFO Property queue.buffering.max.ms is overridden to 
11000000 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,650] INFO Property queue.enqueue.timeout.ms is overridden 
to -1 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,651] INFO Property serializer.class is overridden to 
kafka.serializer.DefaultEncoder (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,704] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,704] INFO Property group.id is overridden to 
playertracking (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,704] INFO Property zookeeper.connect is overridden to 
127.0.0.1:2181 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,705] INFO Property zookeeper.connection.timeout.ms is 
overridden to 1000000 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:57,713] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Connecting to 
zookeeper instance at 127.0.0.1:2181 (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:57,722] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
<snip app. versions etc.>
[2013-07-30 13:40:57,730] INFO Initiating client connection, 
connectString=127.0.0.1:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@2ca60593 (org.apache.zookeeper.ZooKeeper)
[2013-07-30 13:40:57,744] INFO Opening socket connection to server 
/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:57,757] INFO Socket connection established to 
localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:57,764] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0040, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:57,766] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2013-07-30 13:40:57,781] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], starting auto 
committer every 60000 ms (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:57,808] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], begin registering 
consumer playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:57,829] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], end registering 
consumer playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:57,832] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], starting watcher 
executor thread for consumer 
playertracking_playertrackingmobile-1375184457711-6b050d36 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:57,877] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], begin rebalancing 
consumer playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,154] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,155] INFO Property client.id is overridden to 
playertracking (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,155] INFO Property metadata.broker.list is overridden to 
playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,155] INFO Property request.timeout.ms is overridden to 
30000 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,182] INFO Fetching metadata from broker 
id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 topic(s) 
Set(playertracking) (kafka.client.ClientUtils$)
[2013-07-30 13:40:58,184] INFO Connected to playertrackingmobile:9094 for 
producing (kafka.producer.SyncProducer)
[2013-07-30 13:40:58,208] INFO Disconnecting from playertrackingmobile:9094 
(kafka.producer.SyncProducer)
[2013-07-30 13:40:58,215] INFO [ConsumerFetcherManager-1375184457779] Stopping 
leader finder thread (kafka.consumer.ConsumerFetcherManager)
[2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] Stopping 
all fetchers (kafka.consumer.ConsumerFetcherManager)
[2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] All 
connections stopped (kafka.consumer.ConsumerFetcherManager)
[2013-07-30 13:40:58,217] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared all 
relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,219] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared the data 
chunks in all the consumer message iterators 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,219] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Committing all 
offsets after clearing the fetcher queues 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,220] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Releasing 
partition ownership (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,222] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer 
playertracking_playertrackingmobile-1375184457711-6b050d36 rebalancing the 
following partitions: ArrayBuffer(0) for topic playertracking with consumers: 
List(playertracking_playertrackingmobile-1375184457711-6b050d36-0, 
playertracking_playertrackingmobile-1375184457711-6b050d36-1, 
playertracking_playertrackingmobile-1375184457711-6b050d36-2) 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,223] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], 
playertracking_playertrackingmobile-1375184457711-6b050d36-0 attempting to 
claim partition 0 (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,227] WARN 
[playertracking_playertrackingmobile-1375184457711-6b050d36], No broker 
partitions consumed by consumer thread 
playertracking_playertrackingmobile-1375184457711-6b050d36-1 for topic 
playertracking (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,227] WARN 
[playertracking_playertrackingmobile-1375184457711-6b050d36], No broker 
partitions consumed by consumer thread 
playertracking_playertrackingmobile-1375184457711-6b050d36-2 for topic 
playertracking (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,231] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], 
playertracking_playertrackingmobile-1375184457711-6b050d36-0 successfully owned 
partition 0 for topic playertracking (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,232] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Updating the 
cache (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,235] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer 
playertracking_playertrackingmobile-1375184457711-6b050d36 selected partitions 
: playertracking:0: fetched offset = 11686: consumed offset = 11686 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,236] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36-leader-finder-thread],
 Starting  (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
[2013-07-30 13:40:58,237] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], end rebalancing 
consumer playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 
(kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,240] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Creating topic 
event watcher for whitelist .* (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,241] INFO Initiating client connection, 
connectString=127.0.0.1:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@4628a18a (org.apache.zookeeper.ZooKeeper)
[2013-07-30 13:40:58,241] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2013-07-30 13:40:58,245] INFO Opening socket connection to server 
/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:58,246] INFO Socket connection established to 
localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:58,253] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0041, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2013-07-30 13:40:58,254] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2013-07-30 13:40:58,263] INFO 
[playertracking_playertrackingmobile-1375184457711-6b050d36], Topics to consume 
= List(playertracking) (kafka.consumer.ZookeeperConsumerConnector)
[2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-0 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-1 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2013-07-30 13:40:58,270] INFO Starting mirror maker thread mirrormaker-2 
(kafka.tools.MirrorMaker$MirrorMakerThread)
[2013-07-30 13:40:58,273] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,273] INFO Property client.id is overridden to 
playertracking (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,274] INFO Property metadata.broker.list is overridden to 
playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 
(kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,274] INFO Property request.timeout.ms is overridden to 
30000 (kafka.utils.VerifiableProperties)
[2013-07-30 13:40:58,274] INFO Fetching metadata from broker 
id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 topic(s) 
Set(playertracking) (kafka.client.ClientUtils$)
[2013-07-30 13:40:58,275] INFO Connected to playertrackingmobile:9094 for 
producing (kafka.producer.SyncProducer)
[2013-07-30 13:40:58,278] INFO Disconnecting from playertrackingmobile:9094 
(kafka.producer.SyncProducer)
[2013-07-30 13:40:58,291] INFO 
[ConsumerFetcherThread-playertracking_playertrackingmobile-1375184457711-6b050d36-0-20],
 Starting  (kafka.consumer.ConsumerFetcherThread)
[2013-07-30 13:40:58,292] INFO [ConsumerFetcherManager-1375184457779] Adding 
fetcher for partition [playertracking,0], initOffset 11686 to broker 20 with 
fetcherId 0 (kafka.consumer.ConsumerFetcherManager)



Reply via email to