Our mirror maker wiki is not up to date with 0.8. In 0.8, the producer only needs meta.broker.list.
Thanks, Jun On Tue, Jul 30, 2013 at 5:23 AM, Roel van der Made <r...@imgzine.com> wrote: > Hi al, > > I'm working on a DC-DC setup with two Kafka 0.8Beta1 setups and want to > mirror the messages between two DC's (DC A and DC B) using MirrorMaker. > The idea should not be very difficult though I encounter some, for me, > unexplainable results and inconsistencies in the available documentation > (some is only 0.7.x based). > > According to > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29I > should point the mirrormaker to the source cluster's ZK (DC A > localhost:2181) and the producer to the remote (DC B) zookeeper, unless I > use the meta.broker.list parameter, which I do. The meta.broker.list > parameter points to the remote IP address and the three brokers running > there. I cannot find an option on how I can point MirrorMaker to the remote > zookeeper and besides that, the meta.broker.list parameter has shown to be > mandatory for the producer.properties, so seems I am stuck with > meta.broker.list. > > The moment I start MirrorMaker it seems in the end to only use the > broker-list values of the local Zookeeper instance (the localhost:929[234] > brokers) in stead of the values I put in the mirror-producer.properties > file (shown below). I also do not see any connections ending up on the > remote end. (remote-kafka is listed in /etc/hosts with it's public ip). > > So my main questions basically are; > > - What is the best way to define the location of the remote brokers, > through remote ZK or through the meta.broker.list > - Are there any other 0.8-Beta1 things i need to be aware of, or is there > any 0.8 accurate documentation ? :) > > > Thanks for any pointers, while I might well be overlooking things. > > Roel. > > > My mirror-producer.properties file : > > ############################# Producer Basics ############################# > > # list of brokers used for bootstrapping > # format: host1:port1,host2:port2 ... > metadata.broker.list=remote-kafka:9092,remote-kafka:9093,remote-kafka:9094 > > # name of the partitioner class for partitioning events; default partition > spreads data randomly > #partitioner.class= > > # specifies whether the messages are sent asynchronously (async) or > synchronously (sync) > producer.type=async > > # specify the compression codec for all data generated: none , gzip, > snappy. > # the old config values work as well: 0, 1, 2 for none, gzip, snappy, > respectivally > compression.codec=none > > # message encoder > serializer.class=kafka.serializer.DefaultEncoder > > # allow topic level compression > #compressed.topics= > > ############################# Async Producer ############################# > # maximum time, in milliseconds, for buffering data on the producer queue > queue.buffering.max.ms=11000000 > > # the maximum size of the blocking queue for buffering on the producer > queue.buffering.max.messages=2000000 > > # Timeout for event enqueue: > # 0: events will be enqueued immediately or dropped if the queue is full > # -ve: enqueue will block indefinitely if the queue is full > # +ve: enqueue will block up to this many milliseconds if the queue is full > queue.enqueue.timeout.ms=-1 > > # the number of messages batched at the producer > #batch.num.messages= > > > The log mentions: > [playertrackingmobile is the DC A host, remote-kafka is the DC B host] > > > [2013-07-30 13:40:57,437] INFO Starting mirror maker > (kafka.tools.MirrorMaker$) > [2013-07-30 13:40:57,596] INFO Verifying properties > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,649] INFO Property compression.codec is overridden to > none (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,650] INFO Property metadata.broker.list is overridden > to remote-kafka:9092,remote-kafka:9093,remote-kafka:9094 > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,650] INFO Property producer.type is overridden to > async (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.messages is > overridden to 2000000 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.ms is > overridden to 11000000 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,650] INFO Property queue.enqueue.timeout.ms is > overridden to -1 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,651] INFO Property serializer.class is overridden to > kafka.serializer.DefaultEncoder (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,704] INFO Verifying properties > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,704] INFO Property group.id is overridden to > playertracking (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,704] INFO Property zookeeper.connect is overridden to > 127.0.0.1:2181 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,705] INFO Property zookeeper.connection.timeout.msis > overridden to 1000000 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:57,713] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Connecting to > zookeeper instance at > 127.0.0.1:2181(kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:57,722] INFO Starting ZkClient event thread. > (org.I0Itec.zkclient.ZkEventThread) > <snip app. versions etc.> > [2013-07-30 13:40:57,730] INFO Initiating client connection, connectString= > 127.0.0.1:2181 sessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@2ca60593(org.apache.zookeeper.ZooKeeper) > [2013-07-30 13:40:57,744] INFO Opening socket connection to server / > 127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:57,757] INFO Socket connection established to localhost/ > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:57,764] INFO Session establishment complete on server > localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0040, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:57,766] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2013-07-30 13:40:57,781] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], starting auto > committer every 60000 ms (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:57,808] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], begin > registering consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:57,829] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], end > registering consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:57,832] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], starting > watcher executor thread for consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:57,877] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], begin > rebalancing consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,154] INFO Verifying properties > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,155] INFO Property client.id is overridden to > playertracking (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,155] INFO Property metadata.broker.list is overridden > to > playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,155] INFO Property request.timeout.ms is overridden > to 30000 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,182] INFO Fetching metadata from broker > id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 > topic(s) Set(playertracking) (kafka.client.ClientUtils$) > [2013-07-30 13:40:58,184] INFO Connected to playertrackingmobile:9094 for > producing (kafka.producer.SyncProducer) > [2013-07-30 13:40:58,208] INFO Disconnecting from > playertrackingmobile:9094 (kafka.producer.SyncProducer) > [2013-07-30 13:40:58,215] INFO [ConsumerFetcherManager-1375184457779] > Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager) > [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] > Stopping all fetchers (kafka.consumer.ConsumerFetcherManager) > [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] All > connections stopped (kafka.consumer.ConsumerFetcherManager) > [2013-07-30 13:40:58,217] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared all > relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,219] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared the > data chunks in all the consumer message iterators > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,219] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Committing > all offsets after clearing the fetcher queues > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,220] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Releasing > partition ownership (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,222] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 rebalancing the > following partitions: ArrayBuffer(0) for topic playertracking with > consumers: > List(playertracking_playertrackingmobile-1375184457711-6b050d36-0, > playertracking_playertrackingmobile-1375184457711-6b050d36-1, > playertracking_playertrackingmobile-1375184457711-6b050d36-2) > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,223] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], > playertracking_playertrackingmobile-1375184457711-6b050d36-0 attempting to > claim partition 0 (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,227] WARN > [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker > partitions consumed by consumer thread > playertracking_playertrackingmobile-1375184457711-6b050d36-1 for topic > playertracking (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,227] WARN > [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker > partitions consumed by consumer thread > playertracking_playertrackingmobile-1375184457711-6b050d36-2 for topic > playertracking (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,231] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], > playertracking_playertrackingmobile-1375184457711-6b050d36-0 successfully > owned partition 0 for topic playertracking > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,232] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Updating the > cache (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,235] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 selected > partitions : playertracking:0: fetched offset = 11686: consumed offset = > 11686 (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,236] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36-leader-finder-thread], > Starting (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > [2013-07-30 13:40:58,237] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], end > rebalancing consumer > playertracking_playertrackingmobile-1375184457711-6b050d36 try #0 > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,240] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Creating > topic event watcher for whitelist .* > (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,241] INFO Initiating client connection, connectString= > 127.0.0.1:2181 sessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@4628a18a(org.apache.zookeeper.ZooKeeper) > [2013-07-30 13:40:58,241] INFO Starting ZkClient event thread. > (org.I0Itec.zkclient.ZkEventThread) > [2013-07-30 13:40:58,245] INFO Opening socket connection to server / > 127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:58,246] INFO Socket connection established to localhost/ > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:58,253] INFO Session establishment complete on server > localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0041, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2013-07-30 13:40:58,254] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2013-07-30 13:40:58,263] INFO > [playertracking_playertrackingmobile-1375184457711-6b050d36], Topics to > consume = List(playertracking) (kafka.consumer.ZookeeperConsumerConnector) > [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-0 > (kafka.tools.MirrorMaker$MirrorMakerThread) > [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-1 > (kafka.tools.MirrorMaker$MirrorMakerThread) > [2013-07-30 13:40:58,270] INFO Starting mirror maker thread mirrormaker-2 > (kafka.tools.MirrorMaker$MirrorMakerThread) > [2013-07-30 13:40:58,273] INFO Verifying properties > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,273] INFO Property client.id is overridden to > playertracking (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,274] INFO Property metadata.broker.list is overridden > to > playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094 > (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,274] INFO Property request.timeout.ms is overridden > to 30000 (kafka.utils.VerifiableProperties) > [2013-07-30 13:40:58,274] INFO Fetching metadata from broker > id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1 > topic(s) Set(playertracking) (kafka.client.ClientUtils$) > [2013-07-30 13:40:58,275] INFO Connected to playertrackingmobile:9094 for > producing (kafka.producer.SyncProducer) > [2013-07-30 13:40:58,278] INFO Disconnecting from > playertrackingmobile:9094 (kafka.producer.SyncProducer) > [2013-07-30 13:40:58,291] INFO > [ConsumerFetcherThread-playertracking_playertrackingmobile-1375184457711-6b050d36-0-20], > Starting (kafka.consumer.ConsumerFetcherThread) > [2013-07-30 13:40:58,292] INFO [ConsumerFetcherManager-1375184457779] > Adding fetcher for partition [playertracking,0], initOffset 11686 to broker > 20 with fetcherId 0 (kafka.consumer.ConsumerFetcherManager) > > > >