Does the
leader just wait for the followers in the ISR to consume?

That's right. Until that is done, the producer does not get an ack back. It
has an option of retrying if the previous request times out or fails.

A separate question, can the request.required.acks be set to a higher
positive integer, say "2", to indicate that 2 of say 3 replicas have acked?

Yes that's possible.

Thanks,
Neha
On Oct 5, 2013 10:18 AM, "Jason Rosenberg" <j...@squareup.com> wrote:

> Thanks for the explanation Neha.....still holding out hope.....
>
> So, if request.required.acks=-1, how does the leader confirm that the other
> brokers have consumed the message, before acking to the producer?  Does the
> leader just wait for the followers in the ISR to consume?  Or does the
> leader have a way to push, or ping the followers to consume?
>
> Couldn't that mechanism be used, during a clean shutdown, even if the
> messages were initially produced with acks=1? That is, when shutting down,
> get acks from all ISR members for each partition, before shutting down.
>
> I'm just a bit leery about using -1 across the board, because of the
> performance hit (but for now it seems the best option to use for reliable
> sending).
>
> A separate question, can the request.required.acks be set to a higher
> positive integer, say "2", to indicate that 2 of say 3 replicas have acked?
>  ("request.required.acks" in the name would seem to indicate this).  I'm
> not saying I'd want to use this (we are hoping to use only a replication
> factor of 2).
>
> Jason
>
>
> On Sat, Oct 5, 2013 at 1:00 PM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Shouldn't this be part of the contract?  It should be able to make sure
> > this happens before shutting down, no?
> >
> > The leader writes messages to its local log and then the replicas consume
> > messages from the leader and write those to their local logs. If you set
> > request.required.acks=1, the ack is sent to the producer only after the
> > leader has written messages to its local log. What you are asking for, is
> > part of the contract if request.required.acks=-1.
> >
> > In this case, if we need to use
> > required.request.acks=-1, that will pretty much prevent any successful
> > message producing while any of the brokers for a partition is
> unavailable.
> >  So, I don't think that's an option.  (Not to mention the performance
> > degradation).
> >
> > You can implement reliable delivery semantics while allowing rolling
> > restart of brokers by setting request.required.acks=-1. When one of the
> > replicas is shut down, the ISR reduces to remove the replica being shut
> > down and the messages will be committed using the new ISR.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <j...@squareup.com>
> wrote:
> >
> > > Neha,
> > >
> > > I'm not sure I understand.  I would have thought that if the leader
> > > acknowledges receipt of a message, and is then shut down cleanly (with
> > > controlled shutdown enabled), that it would be able to reliably persist
> > any
> > > in memory buffered messages (and replicate them), before shutting down.
> > >  Shouldn't this be part of the contract?  It should be able to make
> sure
> > > this happens before shutting down, no?
> > >
> > > I would understand a message dropped if it were a hard shutdown.
> > >
> > > I'm not sure then how to implement reliable delivery semantics, while
> > > allowing a rolling restart of the broker cluster (or even to tolerate a
> > > single node failure, where one node might be down for awhile and need
> to
> > be
> > > replaced or have a disk repaired).  In this case, if we need to use
> > > required.request.acks=-1, that will pretty much prevent any successful
> > > message producing while any of the brokers for a partition is
> > unavailable.
> > >  So, I don't think that's an option.  (Not to mention the performance
> > > degradation).
> > >
> > > Is there not a way to make this work more reliably with leader only
> > > acknowledgment, and clean/controlled shutdown?
> > >
> > > My test does succeed, as expected, with acks = -1, at least for the 100
> > or
> > > so iterations I've let it run so far.  It does on occasion send
> > duplicates
> > > (but that's ok with me).
> > >
> > > Jason
> > >
> > >
> > > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkh...@gmail.com
> > > >wrote:
> > >
> > > > The occasional single message loss could happen since
> > > > required.request.acks=1 and the leader is shut down before the
> follower
> > > > gets a chance to copy the message. Can you try your test with num
> acks
> > > set
> > > > to -1 ?
> > > >
> > > > Thanks,
> > > > Neha
> > > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <j...@squareup.com> wrote:
> > > >
> > > > > All,
> > > > >
> > > > > I'm having an issue with an integration test I've setup.  This is
> > using
> > > > > 0.8-beta1.
> > > > >
> > > > > The test is to verify that no messages are dropped (or the sender
> > gets
> > > an
> > > > > exception thrown back if failure), while doing a rolling restart
> of a
> > > > > cluster of 2 brokers.
> > > > >
> > > > > The producer is configured to use 'request.required.acks' = '1'.
> > > > >
> > > > > The servers are set up to run locally on localhost, on different
> > ports,
> > > > and
> > > > > different data dirs.  The producer connects with a metadata
> > brokerlist
> > > > > like:  "localhost:2024,localhost:1025" (so no vip).   The servers
> are
> > > set
> > > > > up with a default replication factor of 2.  The servers have
> > controlled
> > > > > shutdown enabled, as well.
> > > > >
> > > > > The producer code looks like this:
> > > > >     ...
> > > > >     Producer<Integer, T> producer = getProducer();
> > > > >     try {
> > > > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> > > T>(topic,
> > > > > message);
> > > > >       producer.send(msg);
> > > > >       return true;
> > > > >     } catch (RuntimeException e) {
> > > > >       logger.warn("Error sending data to kafka", e);
> > > > >       return false;
> > > > >     }
> > > > >     ...
> > > > >
> > > > > The test sends groups of messages at each stage of the test (e.g.
> > > servers
> > > > > up, first server going down, first server coming up, second server
> > > going
> > > > > down, etc.).  Then a consumer connects and consumes all the
> messages,
> > > to
> > > > > make sure they all arrived ok.
> > > > >
> > > > > It seems intermittently, a single message gets dropped, right after
> > one
> > > > of
> > > > > the servers starts going down.  It doesn't happen always, seems to
> > > > happen 1
> > > > > out of every 20 test runs or so.  Here's some sample output.  I see
> > the
> > > > > exception inside the producer code, but I don't see the
> producer.send
> > > > > method ever having an exception thrown back out to the caller (the
> > log
> > > > line
> > > > > "Error sending data to kafka" is never triggered).
> > > > >
> > > > > What's interesting, is that it looks like the exceptions are
> > happening
> > > on
> > > > > message 3, but when the consumer subsequently consumes back all the
> > > > > messages in the broker cluster, it seems message 2 (and not message
> > 3)
> > > is
> > > > > missing:
> > > > >
> > > > > ...
> > > > > ...
> > > > > 7136 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 3, message: 98
> > > > > 7150 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 3, message: 99
> > > > > 7163 [Thread-2] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Shutting down server2
> > > > > 7163 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 0
> > > > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> > > > Shutting
> > > > > down KafkaServer
> > > > > 7176 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 1
> > > > > 7189 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 2
> > > > > 7203 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 3
> > > > > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > > > > 1946108683 received update metadata request with correlation id 7
> > from
> > > an
> > > > > old controller 178709090 with epoch 2. Latest known controller
> epoch
> > > is 3
> > > > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> > > > > [KafkaApi-1946108683] error when handling request
> > > > >
> > > > >
> > > >
> > >
> >
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> > > > > ->
> > > > >
> > > > >
> > > >
> > >
> >
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> > > > > kafka.common.ControllerMovedException: Broker 1946108683 received
> > > update
> > > > > metadata request with correlation id 7 from an old controller
> > 178709090
> > > > > with epoch 2. Latest known controller epoch is 3
> > > > > at
> > > >
> kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> > > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> > > > > at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> > > > > kafka.controller.RequestSendThread  -
> > > > > [Controller-178709090-to-broker-178709090-send-thread], Controller
> > > > > 178709090 fails to send a request to broker 178709090
> > > > > java.nio.channels.AsynchronousCloseException
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  -
> Shut
> > > > down
> > > > > complete for KafkaServer
> > > > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  -
> > > Failed
> > > > to
> > > > > send producer request with correlation id 810 to broker 178709090
> > with
> > > > data
> > > > > for partitions [test-topic,0]
> > > > > java.net.SocketTimeoutException
> > > > > at
> > > >
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> > > > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> > > > > at
> > > > >
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > > at
> > > > >
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > at
> > > >
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> > > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer
> > > connection
> > > > > to localhost:1026 unsuccessful
> > > > > java.net.ConnectException: Connection refused
> > > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic
> > > > metadata
> > > > > with correlation id 811 for topics [Set(test-topic)] from broker
> > > > > [id:1,host:localhost,port:1026] failed
> > > > > java.net.ConnectException: Connection refused
> > > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17340 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 4
> > > > > 17353 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 5
> > > > > 17365 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 6
> > > > >
> > > > > ...
> > > > > ...
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 3, message: 98'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 3, message: 99'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 0'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 1'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 3'
> > > > > 23411 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 4'
> > > > > 23411 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 5'
> > > > >
> > > >
> > >
> >
>

Reply via email to