Our mirror maker wiki is not up to date with 0.8. In 0.8, the producer only
needs  meta.broker.list.

Thanks,

Jun


On Tue, Jul 30, 2013 at 5:23 AM, Roel van der Made <r...@imgzine.com> wrote:

> Hi al,
>
> I'm working on a DC-DC setup with two Kafka 0.8Beta1 setups and want to
> mirror the messages between two DC's (DC A and DC B) using MirrorMaker.
> The idea should not be very difficult though I encounter some, for me,
> unexplainable results and inconsistencies in the available documentation
> (some is only 0.7.x based).
>
> According to
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29I
>  should point the mirrormaker to the source cluster's ZK (DC A
> localhost:2181) and the producer to the remote (DC B) zookeeper, unless I
> use the meta.broker.list parameter, which I do. The meta.broker.list
> parameter points to the remote IP address and the three brokers running
> there. I cannot find an option on how I can point MirrorMaker to the remote
> zookeeper and besides that, the meta.broker.list parameter has shown to be
> mandatory for the producer.properties, so seems I am stuck with
> meta.broker.list.
>
> The moment I start MirrorMaker it seems in the end to only use the
> broker-list values of the local Zookeeper instance (the localhost:929[234]
> brokers) in stead of the values I put in the mirror-producer.properties
> file (shown below). I also do not see any connections ending up on the
> remote end. (remote-kafka is listed in /etc/hosts with it's public ip).
>
> So my main questions basically are;
>
> - What is the best way to define the location of the remote brokers,
> through remote ZK or through the meta.broker.list
> - Are there any other 0.8-Beta1 things i need to be aware of, or is there
> any 0.8 accurate documentation ? :)
>
>
> Thanks for any pointers, while I might well be overlooking things.
>
> Roel.
>
>
> My mirror-producer.properties file :
>
> ############################# Producer Basics #############################
>
> # list of brokers used for bootstrapping
> # format: host1:port1,host2:port2 ...
> metadata.broker.list=remote-kafka:9092,remote-kafka:9093,remote-kafka:9094
>
> # name of the partitioner class for partitioning events; default partition
> spreads data randomly
> #partitioner.class=
>
> # specifies whether the messages are sent asynchronously (async) or
> synchronously (sync)
> producer.type=async
>
> # specify the compression codec for all data generated: none , gzip,
> snappy.
> # the old config values work as well: 0, 1, 2 for none, gzip, snappy,
> respectivally
> compression.codec=none
>
> # message encoder
> serializer.class=kafka.serializer.DefaultEncoder
>
> # allow topic level compression
> #compressed.topics=
>
> ############################# Async Producer #############################
> # maximum time, in milliseconds, for buffering data on the producer queue
> queue.buffering.max.ms=11000000
>
> # the maximum size of the blocking queue for buffering on the producer
> queue.buffering.max.messages=2000000
>
> # Timeout for event enqueue:
> # 0: events will be enqueued immediately or dropped if the queue is full
> # -ve: enqueue will block indefinitely if the queue is full
> # +ve: enqueue will block up to this many milliseconds if the queue is full
> queue.enqueue.timeout.ms=-1
>
> # the number of messages batched at the producer
> #batch.num.messages=
>
>
> The log mentions:
> [playertrackingmobile is the DC A host, remote-kafka is the DC B host]
>
>
> [2013-07-30 13:40:57,437] INFO Starting mirror maker
> (kafka.tools.MirrorMaker$)
> [2013-07-30 13:40:57,596] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,649] INFO Property compression.codec is overridden to
> none (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,650] INFO Property metadata.broker.list is overridden
> to remote-kafka:9092,remote-kafka:9093,remote-kafka:9094
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,650] INFO Property producer.type is overridden to
> async (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.messages is
> overridden to 2000000 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,650] INFO Property queue.buffering.max.ms is
> overridden to 11000000 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,650] INFO Property queue.enqueue.timeout.ms is
> overridden to -1 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,651] INFO Property serializer.class is overridden to
> kafka.serializer.DefaultEncoder (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,704] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,704] INFO Property group.id is overridden to
> playertracking (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,704] INFO Property zookeeper.connect is overridden to
> 127.0.0.1:2181 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,705] INFO Property zookeeper.connection.timeout.msis 
> overridden to 1000000 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:57,713] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Connecting to
> zookeeper instance at 
> 127.0.0.1:2181(kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:57,722] INFO Starting ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> <snip app. versions etc.>
> [2013-07-30 13:40:57,730] INFO Initiating client connection, connectString=
> 127.0.0.1:2181 sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@2ca60593(org.apache.zookeeper.ZooKeeper)
> [2013-07-30 13:40:57,744] INFO Opening socket connection to server /
> 127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:57,757] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:57,764] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0040, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:57,766] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2013-07-30 13:40:57,781] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], starting auto
> committer every 60000 ms (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:57,808] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], begin
> registering consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:57,829] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], end
> registering consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 in ZK
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:57,832] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], starting
> watcher executor thread for consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:57,877] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], begin
> rebalancing consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 try #0
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,154] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,155] INFO Property client.id is overridden to
> playertracking (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,155] INFO Property metadata.broker.list is overridden
> to
> playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,155] INFO Property request.timeout.ms is overridden
> to 30000 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,182] INFO Fetching metadata from broker
> id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1
> topic(s) Set(playertracking) (kafka.client.ClientUtils$)
> [2013-07-30 13:40:58,184] INFO Connected to playertrackingmobile:9094 for
> producing (kafka.producer.SyncProducer)
> [2013-07-30 13:40:58,208] INFO Disconnecting from
> playertrackingmobile:9094 (kafka.producer.SyncProducer)
> [2013-07-30 13:40:58,215] INFO [ConsumerFetcherManager-1375184457779]
> Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
> [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779]
> Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
> [2013-07-30 13:40:58,216] INFO [ConsumerFetcherManager-1375184457779] All
> connections stopped (kafka.consumer.ConsumerFetcherManager)
> [2013-07-30 13:40:58,217] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared all
> relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,219] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Cleared the
> data chunks in all the consumer message iterators
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,219] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Committing
> all offsets after clearing the fetcher queues
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,220] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Releasing
> partition ownership (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,222] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 rebalancing the
> following partitions: ArrayBuffer(0) for topic playertracking with
> consumers:
> List(playertracking_playertrackingmobile-1375184457711-6b050d36-0,
> playertracking_playertrackingmobile-1375184457711-6b050d36-1,
> playertracking_playertrackingmobile-1375184457711-6b050d36-2)
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,223] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36],
> playertracking_playertrackingmobile-1375184457711-6b050d36-0 attempting to
> claim partition 0 (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,227] WARN
> [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker
> partitions consumed by consumer thread
> playertracking_playertrackingmobile-1375184457711-6b050d36-1 for topic
> playertracking (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,227] WARN
> [playertracking_playertrackingmobile-1375184457711-6b050d36], No broker
> partitions consumed by consumer thread
> playertracking_playertrackingmobile-1375184457711-6b050d36-2 for topic
> playertracking (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,231] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36],
> playertracking_playertrackingmobile-1375184457711-6b050d36-0 successfully
> owned partition 0 for topic playertracking
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,232] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Updating the
> cache (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,235] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 selected
> partitions : playertracking:0: fetched offset = 11686: consumed offset =
> 11686 (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,236] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36-leader-finder-thread],
> Starting  (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> [2013-07-30 13:40:58,237] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], end
> rebalancing consumer
> playertracking_playertrackingmobile-1375184457711-6b050d36 try #0
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,240] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Creating
> topic event watcher for whitelist .*
> (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,241] INFO Initiating client connection, connectString=
> 127.0.0.1:2181 sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@4628a18a(org.apache.zookeeper.ZooKeeper)
> [2013-07-30 13:40:58,241] INFO Starting ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2013-07-30 13:40:58,245] INFO Opening socket connection to server /
> 127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:58,246] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:58,253] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x1402e8c12ae0041, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2013-07-30 13:40:58,254] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2013-07-30 13:40:58,263] INFO
> [playertracking_playertrackingmobile-1375184457711-6b050d36], Topics to
> consume = List(playertracking) (kafka.consumer.ZookeeperConsumerConnector)
> [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-0
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2013-07-30 13:40:58,269] INFO Starting mirror maker thread mirrormaker-1
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2013-07-30 13:40:58,270] INFO Starting mirror maker thread mirrormaker-2
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2013-07-30 13:40:58,273] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,273] INFO Property client.id is overridden to
> playertracking (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,274] INFO Property metadata.broker.list is overridden
> to
> playertrackingmobile:9092,playertrackingmobile:9093,playertrackingmobile:9094
> (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,274] INFO Property request.timeout.ms is overridden
> to 30000 (kafka.utils.VerifiableProperties)
> [2013-07-30 13:40:58,274] INFO Fetching metadata from broker
> id:30,host:playertrackingmobile,port:9094 with correlation id 0 for 1
> topic(s) Set(playertracking) (kafka.client.ClientUtils$)
> [2013-07-30 13:40:58,275] INFO Connected to playertrackingmobile:9094 for
> producing (kafka.producer.SyncProducer)
> [2013-07-30 13:40:58,278] INFO Disconnecting from
> playertrackingmobile:9094 (kafka.producer.SyncProducer)
> [2013-07-30 13:40:58,291] INFO
> [ConsumerFetcherThread-playertracking_playertrackingmobile-1375184457711-6b050d36-0-20],
> Starting  (kafka.consumer.ConsumerFetcherThread)
> [2013-07-30 13:40:58,292] INFO [ConsumerFetcherManager-1375184457779]
> Adding fetcher for partition [playertracking,0], initOffset 11686 to broker
> 20 with fetcherId 0 (kafka.consumer.ConsumerFetcherManager)
>
>
>
>

Reply via email to