Hi al, I'm working on a DC-DC setup with two Kafka 0.8Beta1 setups and want to mirror the messages between two DC's (DC A and DC B) using MirrorMaker. The idea should not be very difficult though I encounter some, for me, unexplainable results and inconsistencies in the available documentation (some is only 0.7.x based).
According to https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29 I should point the mirrormaker to the source cluster's ZK (DC A localhost:2181) and the producer to the remote (DC B) zookeeper, unless I use the meta.broker.list parameter, which I do. The meta.broker.list parameter points to the remote IP address and the three brokers running there. I cannot find an option on how I can point MirrorMaker to the remote zookeeper and besides that, the meta.broker.list parameter has shown to be mandatory for the producer.properties, so seems I am stuck with meta.broker.list. The moment I start MirrorMaker it seems in the end to only use the broker-list values of the local Zookeeper instance (the localhost:929[234] brokers) in stead of the values I put in the mirror-producer.properties file (shown below). I also do not see any connections ending up on the remote end. (remote-kafka is listed in /etc/hosts with it's public ip). So my main questions basically are; - What is the best way to define the location of the remote brokers, through remote ZK or through the meta.broker.list - Are there any other 0.8-Beta1 things i need to be aware of, or is there any 0.8 accurate documentation ? :) Thanks for any pointers, while I might well be overlooking things. Roel. My mirror-producer.properties file : ############################# Producer Basics ############################# # list of brokers used for bootstrapping # format: host1:port1,host2:port2 ... metadata.broker.list=remote-kafka:9092,remote-kafka:9093,remote-kafka:9094 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=async # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none # message encoder serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue queue.buffering.max.ms=11000000 # the maximum size of the blocking queue for buffering on the producer queue.buffering.max.messages=2000000 # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full queue.enqueue.timeout.ms=-1 # the number of messages batched at the producer #batch.num.messages= The log mentions: [playertrackingmobile is the DC A host, remote-kafka is the DC B host] [2013-07-30 13:40:57,437] INFO Starting mirror maker (kafka.tools.MirrorMaker$) [2013-07-30 13:40:57,596] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,649] INFO Property compression.codec is overridden to none (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,650] INFO Property metadata.broker.list is overridden to remote-kafka:9092,remote-kafka:9093,remote-kafka:9094 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,650] INFO Property producer.type is overridden to async (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.messages is overridden to 2000000 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.ms is overridden to 11000000 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,650] INFO Property queue.enqueue.timeout.ms is overridden to -1 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,651] INFO Property serializer.class is overridden to kafka.serializer.DefaultEncoder (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,704] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,704] INFO Property group.id is overridden to playertracking (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,704] INFO Property zookeeper.connect is overridden to 127.0.0.1:2181 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,705] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:57,713] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Connecting to zookeeper instance at 127.0.0.1:2181 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:57,722] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) <snip app. versions etc.> [2013-07-30 13:40:57,730] INFO Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2ca60593 (org.apache.zookeeper.ZooKeeper) [2013-07-30 13:40:57,744] INFO Opening socket connection to server /127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:57,757] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:57,764] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0040, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:57,766] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2013-07-30 13:40:57,781] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], starting auto committer every 60000 ms (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:57,808] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], begin registering consumer playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:57,829] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], end registering consumer playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:57,832] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], starting watcher executor thread for consumer playertracking_playertrackingmobile-1375184457711-6b050d36 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:57,877] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], begin rebalancing consumer playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,154] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,155] INFO Property client.id is overridden to playertracking (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,155] INFO Property metadata.broker.list is overridden to playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,155] INFO Property request.timeout.ms is overridden to 30000 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,182] INFO Fetching metadata from broker id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 topic(s) Set(playertracking) (kafka.client.ClientUtils$) [2013-07-30 13:40:58,184] INFO Connected to playertrackingmobile:9094 for producing (kafka.producer.SyncProducer) [2013-07-30 13:40:58,208] INFO Disconnecting from playertrackingmobile:9094 (kafka.producer.SyncProducer) [2013-07-30 13:40:58,215] INFO [ConsumerFetcherManager-1375184457779] Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager) [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] Stopping all fetchers (kafka.consumer.ConsumerFetcherManager) [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] All connections stopped (kafka.consumer.ConsumerFetcherManager) [2013-07-30 13:40:58,217] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared all relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,219] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared the data chunks in all the consumer message iterators (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,219] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,220] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,222] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer playertracking_playertrackingmobile-1375184457711-6b050d36 rebalancing the following partitions: ArrayBuffer(0) for topic playertracking with consumers: List(playertracking_playertrackingmobile-1375184457711-6b050d36-0, playertracking_playertrackingmobile-1375184457711-6b050d36-1, playertracking_playertrackingmobile-1375184457711-6b050d36-2) (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,223] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], playertracking_playertrackingmobile-1375184457711-6b050d36-0 attempting to claim partition 0 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,227] WARN [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker partitions consumed by consumer thread playertracking_playertrackingmobile-1375184457711-6b050d36-1 for topic playertracking (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,227] WARN [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker partitions consumed by consumer thread playertracking_playertrackingmobile-1375184457711-6b050d36-2 for topic playertracking (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,231] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], playertracking_playertrackingmobile-1375184457711-6b050d36-0 successfully owned partition 0 for topic playertracking (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,232] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Updating the cache (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,235] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer playertracking_playertrackingmobile-1375184457711-6b050d36 selected partitions : playertracking:0: fetched offset = 11686: consumed offset = 11686 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,236] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36-leader-finder-thread], Starting (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) [2013-07-30 13:40:58,237] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], end rebalancing consumer playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,240] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Creating topic event watcher for whitelist .* (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,241] INFO Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4628a18a (org.apache.zookeeper.ZooKeeper) [2013-07-30 13:40:58,241] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-07-30 13:40:58,245] INFO Opening socket connection to server /127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:58,246] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:58,253] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0041, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2013-07-30 13:40:58,254] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2013-07-30 13:40:58,263] INFO [playertracking_playertrackingmobile-1375184457711-6b050d36], Topics to consume = List(playertracking) (kafka.consumer.ZookeeperConsumerConnector) [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-0 (kafka.tools.MirrorMaker$MirrorMakerThread) [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-1 (kafka.tools.MirrorMaker$MirrorMakerThread) [2013-07-30 13:40:58,270] INFO Starting mirror maker thread mirrormaker-2 (kafka.tools.MirrorMaker$MirrorMakerThread) [2013-07-30 13:40:58,273] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,273] INFO Property client.id is overridden to playertracking (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,274] INFO Property metadata.broker.list is overridden to playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,274] INFO Property request.timeout.ms is overridden to 30000 (kafka.utils.VerifiableProperties) [2013-07-30 13:40:58,274] INFO Fetching metadata from broker id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 topic(s) Set(playertracking) (kafka.client.ClientUtils$) [2013-07-30 13:40:58,275] INFO Connected to playertrackingmobile:9094 for producing (kafka.producer.SyncProducer) [2013-07-30 13:40:58,278] INFO Disconnecting from playertrackingmobile:9094 (kafka.producer.SyncProducer) [2013-07-30 13:40:58,291] INFO [ConsumerFetcherThread-playertracking_playertrackingmobile-1375184457711-6b050d36-0-20], Starting (kafka.consumer.ConsumerFetcherThread) [2013-07-30 13:40:58,292] INFO [ConsumerFetcherManager-1375184457779] Adding fetcher for partition [playertracking,0], initOffset 11686 to broker 20 with fetcherId 0 (kafka.consumer.ConsumerFetcherManager)