This thread is a bit long, but let me see if I can restate it
correctly (not sure I fully follow).

There are two suggestions:
1. Allow partial rebalances that move just some partitions. I.e. if a
consumer fails and has only one partition only one other consumer
should be effected (the one who picks up the new partition). If there
are many partitions to be reassigned there will obviously be a
tradeoff between impacting all consumers and balancing load evenly.
I.e. if you moved all load to one other consumer that would cause
little rebalancing interruption but poor load balancing.
2. Have the co-ordinator communicate the assignments to the brokers
rather than to the client directly. This could potentially simplify
the consumer. Perhaps it would be possible to have the leader track
liveness using the fetch requests rather than needing an artificial
heartbeat.

These are interesting ideas.

-Jay



On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hello Rob,
>
> If I get your idea right, the idea is that if the rebalance only changes
> the ownership of a few consumers in the group, the coordinator can just
> sync with them and do not interrupt with other consumers.
>
> I think this approach may work. However it will likely complicates the
> logic of coordinator after we sketch out all the details since the
> rebalance results is basically depend on two variables: 1) partitions for
> the subscribed topics, 2) consumers inside the group, hence following this
> approach by the time the coordinator decides to trigger a rebalance, it
> must correctly keep track of which variable changes triggers the current
> rebalance process; on the other hand, the normal rebalance process even
> with a global barrier should usually be very fast, with a few hundreds of
> millis. So I am not sure if this is a worthy optimization that we would
> want for now. What do you think?
>
> Guozhang
>
>
> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers <robert.w.with...@gmail.com
>> wrote:
>
>> Lock is a bad way to say it; a barrier is better.  I don't think what I am
>> saying is even a barrier, since the rebalance would just need to recompute
>> a rebalance schedule and submit it.  The only processing delay is to allow
>> a soft remove to let the client cleanup, before you turn on the new guy, so
>> it lags a bit.  Do you think this could this work?
>>
>> Thanks,
>> Rob
>>
>> > On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com>
>> wrote:
>> >
>> > Hi Guozhang,
>> >
>> > Thank you for considering my suggestions.  The security layer sounds
>> like the right facet to design for these sorts of capabilities.  Have you
>> considered a chained ocap security model for the broker using hash tokens?
>>  This would provide for per-partition read/write capabilities with QoS
>> context including leases, revocation, debug level and monitoring.  Overkill
>> disappears as no domain specific info needs to be stored at the brokers,
>> like consumer/partition assignments.  The read ocap for consumer 7/topic
>> bingo/partition 131 could be revoked at the brokers for a partition and
>> subsequent reads would fail the fetch for requests with that ocap token.
>>  You could also dynamically change the log level for a specific
>> consumer/partition.
>> >
>> > There are advantages we could discuss to having finer grained control.
>>  Consider that scheduled partition rebalancing could be implemented with no
>> pauses from the perspective of the consumer threads; it looks like single
>> partition lag, as the offset commit occurs before rotation, with no lag to
>> non-rebalanced partitions: rebalance 1 partition per second so as to creep
>> load to a newbie consumer.  It would eliminate a global read lock and even
>> the internal Kafka consumer would never block on IO protocol other than the
>> normal fetch request (and the initial join group request).
>> >
>> > A global lock acquired through a pull protocol (HeartbeatRequest
>> followed by a JoinGroupRequest) for all live consumers is a much bigger
>> lock than security-based push protocol, as I assume the coordinator will
>> have open sockets to all brokers in order to reach out as needed.  As well,
>> each lock would be independent between partitions and be with only those
>> brokers in ISR for a given partition.  It is a much smaller lock.
>> >
>> > I had some time to consider my suggestion that it be viewed as a
>> relativistic frame of reference.  Consider the model where each dimension
>> of the frame of reference for each consumer is each partition, actually a
>> sub-space with the dimensionality of the replication factor, but with a
>> single leader election, so consider it 1 dimension.  The total
>> dimensionality of the consumers frame of reference is the number of
>> partitions, but only assigned partitions are open to a given consumers
>> viewpoint.  The offset is the partition dimension's coordinate and only
>> consumers with an open dimension can translate the offset.  A rebalance
>> opens or closes a dimension for a given consumer and can be viewed as a
>> rotation.  Could Kafka consumption and rebalance (and ISR leader election)
>> be reduced to matrix operations?
>> >
>> > Rob
>> >
>> >> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> >>
>> >> Hi Rob,
>> >>
>> >> Sorry for the late reply.
>> >>
>> >> If I understand your approach correctly, it requires all brokers to
>> >> remember the partition assignment of each consumer in order to decide
>> >> whether or not authorizing the fetch request, correct? If we are indeed
>> >> going to do such authorization for the security project then maybe it
>> is a
>> >> good way to go, but otherwise might be an overkill to just support finer
>> >> grained partition assignment. In addition, instead of requiring a round
>> >> trip between the coordinator and the consumers for the synchronization
>> >> barrier, now the coordinator needs to wait for a round trip between
>> itself
>> >> and other brokers before it can return the join-group request, right?
>> >>
>> >> Guozhang
>> >>
>> >>
>> >> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <
>> robert.w.with...@gmail.com>
>> >> wrote:
>> >>
>> >>> Hi Guozhang,
>> >>>
>> >>>
>> >>>
>> >>> Currently, the brokers do not know which high-level consumers are
>> reading
>> >>> which partitions and it is the rebalance between the consumers and the
>> >>> coordinator which would authorize a consumer to fetch a particular
>> >>> partition, I think.  Does this mean that when a rebalance occurs, all
>> >>> consumers must send a JoinGroupRequest and that the coordinator will
>> not
>> >>> respond to any consumers until all consumers have sent the
>> >>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>> >>> potential to be a sizable global delay.
>> >>>
>> >>>
>> >>>
>> >>> On the assumption that there is only one coordinator for a group, why
>> >>> couldn't the synchronization barrier be per partition and internal to
>> kafka
>> >>> and mostly not involve the consumers, other than a chance for
>> offsetCommit
>> >>> by the consumer losing a partition?   If the brokers have session
>> state and
>> >>> knows the new assignment before the consumer is notified with a
>> >>> HeartbeatResponse, it could fail the fetch request from that consumer
>> for
>> >>> an invalidly assigned partition.  The consumer could take an invalid
>> >>> partition failure of the fetch request as if it were a
>> HeartbeatResponse
>> >>> partition removal.
>> >>>
>> >>>
>> >>>
>> >>> The only gap seems to be: when does a consumer that is losing a
>> partition
>> >>> get a chance to commit its offset?  If there were a
>> >>> PartitionCommittedNotification message that a consumer could send to
>> the
>> >>> coordinator after committing its offsets, then the coordinator could
>> send
>> >>> the add partition HeartbeatResponse, after receiving the
>> >>> PartitionCommittedNotification, to the consumer gaining the partition
>> and
>> >>> the offset management is stable.  The advantage is that none of the
>> other
>> >>> consumers would be paused on any other partitions.  Only partitions
>> being
>> >>> rebalanced would see any consumption pause.
>> >>>
>> >>>
>> >>>
>> >>> So, something like:
>> >>>
>> >>> 1.  3 consumers running with 12 partitions balanced, 4 each
>> >>>
>> >>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>> >>>
>> >>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>> >>> consumer will lose a partition assigned to the new consumer
>> >>>
>> >>> 4.  Coordinator informs all live brokers of partition reassignments
>> >>>
>> >>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>> >>> requests and acks back to the coordinator
>> >>>
>> >>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses
>> with
>> >>> partition removals to existing consumers and awaits
>> >>> PartitionCommittedNotifications from consumers losing partitions.
>> >>>
>> >>> 7.  Existing consumers can continue to fetch messages from correctly
>> >>> assigned partitions
>> >>>
>> >>> 8.  When an existing consumer fails a fetch for a partition or gets a
>> >>> HeartbeatResponse with a partition removal, it would commitOffsets for
>> that
>> >>> partition and then send a PartitionCommittedNotification to the
>> coordinator.
>> >>>
>> >>> 9.  As the Coordinator receives the PartitionCommittedNotification,
>> for a
>> >>> particular partition from an existing consumer, it sends the
>> addPartition
>> >>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>> >>> fetching that partition.
>> >>>
>> >>>
>> >>>
>> >>> If a consumer drops HeartbeatRequests within a session timeout, the
>> >>> coordinator would inform the brokers and they would fail fetchRequests
>> for
>> >>> those partitions from that consumer.   There is no chance to send
>> >>> removePartitions since no Heartbeat is occurring, but the addPartitions
>> >>> could be sent and the offset is what it is.  This seems no different
>> than
>> >>> this sort of failure, today.
>> >>>
>> >>>
>> >>>
>> >>> Instead of a global synchronization barrier isn’t it possible to have
>> an
>> >>> incremental per-partition synchronization barrier?  The brokers would
>> have
>> >>> to be aware of this.  I think of it as relativistic from each
>> acceleration
>> >>> frame of reference, which is each consumer: event horizons.
>> >>>
>> >>>
>> >>>
>> >>> Regards,
>> >>>
>> >>> Rob
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> -----Original Message-----
>> >>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>> >>> Sent: Wednesday, July 16, 2014 9:20 AM
>> >>> To: users@kafka.apache.org
>> >>> Subject: Re: New Consumer Design
>> >>>
>> >>>
>> >>>
>> >>> Hi Rob,
>> >>>
>> >>>
>> >>>
>> >>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>> >>> inconsistency of the partition assignments to consumers with
>> consecutive
>> >>> triggering of rebalances, since the coordinator no longer has a
>> >>> synchronization barrier any more for re-compute the distribution with a
>> >>> consistent view.
>> >>>
>> >>>
>> >>>
>> >>> Guozhang
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>> >>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>> >>>
>> >>> wrote:
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>> >>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>> >>>
>> >>>> wrote:
>> >>>
>> >>>
>> >>>
>> >>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>> >>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>> >>>
>> >>>
>> >>>>>> Hi All,
>> >>>
>> >>>
>> >>>>>> We have written a wiki a few weeks back proposing a single-threaded
>> >>>
>> >>>> ZK-free
>> >>>
>> >>>>>> consumer client design for 0.9:
>> >>>
>> >>>
>> >>>
>> >>>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>> >>>
>> >>>> ewrite+Design
>> >>>
>> >>>
>> >>>>>> We want to share some of the ideas that came up for this design to
>> >>>
>> >>>>>> get
>> >>>
>> >>>> some
>> >>>
>> >>>>>> early feedback. The below discussion assumes you have read the
>> >>>
>> >>>>>> above
>> >>>
>> >>>> wiki.
>> >>>
>> >>>
>> >>>>>> *Offset Management*
>> >>>
>> >>>
>> >>>>>> To make consumer clients ZK-free we need to move the offset
>> >>>
>> >>>>>> management utility from ZooKeeper into the Kafka servers, which
>> >>>
>> >>>>>> then store offsets
>> >>>
>> >>>> as
>> >>>
>> >>>>>> a special log. Key-based log compaction will be used to keep this
>> >>>
>> >>>>>> ever-growing log's size bounded. Brokers who are responsible for
>> >>>
>> >>>>>> the
>> >>>
>> >>>> offset
>> >>>
>> >>>>>> management for a given consumer group will be the leader of this
>> >>>
>> >>>>>> special offset log partition where the partition key will be
>> >>>
>> >>>>>> consumer group
>> >>>
>> >>>> names.
>> >>>
>> >>>>>> On the consumer side, instead of talking to ZK for commit and read
>> >>>
>> >>>> offsets,
>> >>>
>> >>>>>> it talks to the servers with new offset commit and offset fetch
>> >>>
>> >>>>>> request types for offset management. This work has been done and
>> >>>
>> >>>>>> will be
>> >>>
>> >>>> included
>> >>>
>> >>>>>> in the 0.8.2 release. Details of the implementation can be found in
>> >>>
>> >>>>>> KAFKA-1000 and this wiki:
>> >>>
>> >>>
>> >>>
>> >>>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>> >>>
>> >>>> set+Management
>> >>>
>> >>>
>> >>>>>> *Group Membership Management*
>> >>>
>> >>>
>> >>>>>> The next step is to move the membership management and rebalancing
>> >>>
>> >>>> utility
>> >>>
>> >>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>> >>>
>> >>>> coordinator
>> >>>
>> >>>>>> hosted on the Kafka servers which is responsible for keeping track
>> >>>
>> >>>>>> of
>> >>>
>> >>>> group
>> >>>
>> >>>>>> membership and consumption partition changes, and the corresponding
>> >>>
>> >>>>>> rebalancing process.
>> >>>
>> >>>
>> >>>>>> *1. Failure Detection*
>> >>>
>> >>>
>> >>>>>> More specifically, we will use a heartbeating protocol for consumer
>> >>>
>> >>>>>> and coordinator failure detections. The heartbeating protocol would
>> >>>
>> >>>>>> be
>> >>>
>> >>>> similar
>> >>>
>> >>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>> >>>
>> >>>>>> value
>> >>>
>> >>>> to
>> >>>
>> >>>>>> the coordinator on startup, and keep sending heartbeats to the
>> >>>
>> >>>> coordinator
>> >>>
>> >>>>>> afterwards every session-timeout / heartbeat-frequency. The
>> >>>
>> >>>>>> coordinator will treat a consumer as failed when it has not heard
>> >>>
>> >>>>>> from the consumer after session-timeout, and the consumer will
>> >>>
>> >>>>>> treat its coordinator as failed after it has not heard its
>> >>>
>> >>>>>> heartbeat responses after
>> >>>
>> >>>> session-timeout.
>> >>>
>> >>>
>> >>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>> >>>
>> >>>>>> the heartbeat-frequency as three, we make this value configurable
>> >>>
>> >>>>>> in
>> >>>
>> >>>> consumer
>> >>>
>> >>>>>> clients. This is because the rebalancing process (we will discuss
>> >>>
>> >>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>> >>>
>> >>>>>> want this
>> >>>
>> >>>> value
>> >>>
>> >>>>>> to be large while not DDoSing the servers.
>> >>>
>> >>>
>> >>>>>> *2. Consumer Rebalance*
>> >>>
>> >>>
>> >>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>> >>>
>> >>>>>> failure,
>> >>>
>> >>>> 2)
>> >>>
>> >>>>>> new consumer joining group, 3) existing consumer change consume
>> >>>
>> >>>>>> topic/partitions, 4) partition change for the consuming topics.
>> >>>
>> >>>>>> Once the coordinator decides to trigger a rebalance it will notify
>> >>>
>> >>>>>> the consumers within to resend their topic/partition subscription
>> >>>
>> >>>>>> information, and
>> >>>
>> >>>> then
>> >>>
>> >>>>>> assign existing partitions to these consumers. On the consumers
>> >>>
>> >>>>>> end,
>> >>>
>> >>>> they
>> >>>
>> >>>>>> no long run any rebalance logic in a distributed manner but follows
>> >>>
>> >>>>>> the partition assignment it receives from the coordinator. Since
>> >>>
>> >>>>>> the
>> >>>
>> >>>> consumers
>> >>>
>> >>>>>> can only be notified of a rebalance triggering via heartbeat
>> >>>
>> >>>>>> responses,
>> >>>
>> >>>> the
>> >>>
>> >>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>> >>>
>> >>>>>> is
>> >>>
>> >>>> why
>> >>>
>> >>>>>> we allow consumers to negotiate this value with the coordinator
>> >>>
>> >>>>>> upon registration, and we would like to hear people's thoughts
>> >>>
>> >>>>>> about this protocol.
>> >>>
>> >>>
>> >>>>> One question on this topic, my planned usage pattern is to have a
>> >>>
>> >>>>> high
>> >>>
>> >>>> number of consumers that are ephemeral.  Imagine a consumer coming
>> >>>
>> >>>> online, consuming messages for a short duration, then disappearing
>> >>>
>> >>>> forever.  Is this use case supported?
>> >>>
>> >>>
>> >>>
>> >>>> If consumers are coming and going, there will be a high rate of
>> >>>
>> >>>> rebalances and that will really start throttling consumption at some
>> >>>
>> >>>> point.  You may have a use that hits that detrimentally.  A custom
>> >>>
>> >>>> rebalance algorithm may be able to rebalance a sub-set of the
>> >>>
>> >>>> consumers and not inform others of a need to rebalance them, which
>> >>>
>> >>>> would ameliorate the cost.  Still the consumers that need to rebalance
>> >>> would pay.
>> >>>
>> >>>
>> >>>> If the initial JoinGroupRequest was preserved, would it be possible to
>> >>>
>> >>>> send rebalanced partition information in the HeartbeatResponse, up
>> >>>
>> >>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>> >>>
>> >>>> trip?  I mean, push the rebalance and not pull it with a secondary
>> >>>
>> >>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>> >>>
>> >>>> point, but poorly timed fetch requests may fail.  If the consumer were
>> >>>
>> >>>> resilient to fetch failures, might it work?
>> >>>
>> >>>
>> >>>
>> >>>>>> In addition, the rebalance logic in the new consumer will be
>> >>>
>> >>>> customizable
>> >>>
>> >>>>>> instead of hard-written in a per-topic round-robin manner. Users
>> >>>
>> >>>>>> will be able to write their own assignment class following a common
>> >>>
>> >>>>>> interface,
>> >>>
>> >>>> and
>> >>>
>> >>>>>> consumers upon registering themselves will specify in their
>> >>>
>> >>>>>> registration request the assigner class name they wanted to use. If
>> >>>
>> >>>>>> consumers within
>> >>>
>> >>>> the
>> >>>
>> >>>>>> same group specify different algorithms the registration will be
>> >>>
>> >>>> rejected.
>> >>>
>> >>>>>> We are not sure if it is the best way of customizing rebalance
>> >>>
>> >>>>>> logic
>> >>>
>> >>>> yet,
>> >>>
>> >>>>>> so any feedbacks are more than welcome.
>> >>>
>> >>>
>> >>>>>> For wildcard consumption, the consumers now will capture new topics
>> >>>
>> >>>>>> that are available for fetching through the topic metadata request.
>> >>>
>> >>>>>> That is, periodically the consumers will update its topic metadata
>> >>>
>> >>>>>> for fetching,
>> >>>
>> >>>> and
>> >>>
>> >>>>>> if new topics are returned in the metadata response matching its
>> >>>
>> >>>> wildcard
>> >>>
>> >>>>>> regex, it will notify the coordinator to let it trigger a new
>> >>>
>> >>>>>> rebalance
>> >>>
>> >>>> to
>> >>>
>> >>>>>> assign partitions for this new topic.
>> >>>
>> >>>
>> >>>>>> There are some failure handling cases during the rebalancing
>> >>>
>> >>>>>> process discussed in the consumer rewrite design wiki. We would
>> >>>
>> >>>>>> encourage
>> >>>
>> >>>> people to
>> >>>
>> >>>>>> read it and let us know if there are any other corner cases not
>> >>> covered.
>> >>>
>> >>>
>> >>>>>> Moving forward, we are thinking about making the coordinator to
>> >>>
>> >>>>>> handle
>> >>>
>> >>>> both
>> >>>
>> >>>>>> the group management and offset management for the given groups,
>> i.e.
>> >>>
>> >>>> the
>> >>>
>> >>>>>> leader of the offset log partition will naturally become the
>> >>>
>> >>>> coordinator of
>> >>>
>> >>>>>> the corresponding consumer. This allows the coordinator to reject
>> >>>
>> >>>>>> offset commit requests if its sender is not part of the group. To
>> >>>
>> >>>>>> do so a
>> >>>
>> >>>> consumer
>> >>>
>> >>>>>> id and generation id are applied to control the group member
>> >>>
>> >>>> generations.
>> >>>
>> >>>>>> Details of this design is included in the write design wiki page,
>> >>>
>> >>>>>> and
>> >>>
>> >>>> again
>> >>>
>> >>>>>> any feedbacks are appreciated.
>> >>>
>> >>>
>> >>>>>> *3. Non-blocking Network IO*
>> >>>
>> >>>
>> >>>>>> With a single-threaded consumer, we will instead use a non-blocking
>> >>>
>> >>>> network
>> >>>
>> >>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>> >>>
>> >>>>>> does
>> >>>
>> >>>> for
>> >>>
>> >>>>>> asynchronous sending data. In terms of implementation, we will let
>> >>>
>> >>>>>> the
>> >>>
>> >>>> new
>> >>>
>> >>>>>> consumer/producer clients be based on a common non-blocking IO
>> >>>
>> >>>>>> thread class. Details of this refactoring can be found in
>> KAFKA-1316.
>> >>>
>> >>>
>> >>>>>> *4. Consumer API*
>> >>>
>> >>>
>> >>>>>> As a result of the single-threaded non-blocking network IO
>> >>>
>> >>>> implementation,
>> >>>
>> >>>>>> the new consumer API will no longer base on a blocking stream
>> >>>
>> >>>>>> iterator interface but a poll(timeout) interface. In addition, the
>> >>>
>> >>>>>> consumer APIs
>> >>>
>> >>>> are
>> >>>
>> >>>>>> designed to be flexible so that people can choose to either use the
>> >>>
>> >>>> default
>> >>>
>> >>>>>> offset and group membership management utilities mentioned above or
>> >>>
>> >>>>>> implement their own offset/group management logic. For example, for
>> >>>
>> >>>>>> scenarios that needs fixed partition assignment or that prefer to
>> >>>
>> >>>>>> store offsets locally in a data store, the new consumer APIs would
>> >>>
>> >>>>>> make it
>> >>>
>> >>>> much
>> >>>
>> >>>>>> easier to customize. Details of the API design can be found in
>> >>>
>> >>>> KAFKA-1328.
>> >>>
>> >>>>>> A question to think about is are there any other potential use
>> >>>
>> >>>>>> cases
>> >>>
>> >>>> that
>> >>>
>> >>>>>> can not be easily supported by this API.
>> >>>
>> >>>
>> >>>>>> Cheers,
>> >>>
>> >>>>>> Guozhang
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>>
>> >>> -- Guozhang
>> >>
>> >>
>> >> --
>> >> -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Reply via email to