Rajiv,

Those are good points. As for implementation we have developed a class in
producer that can be probably re-used for the consumer as well.

org.apache.kafka.clients.producer.internals.BufferPool

Please feel free to add more comments on KAFKA-2045.

Guozhang


On Tue, Mar 24, 2015 at 12:21 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Guozhang,
>
> Yeah the main motivation is to not require de-serialization but still allow
> the consumer to de-serialize into objects if they really want to. Another
> motivation for iterating over the ByteBuffer on the fly is that we can
> prevent copies all together. This has an added implication though. Since we
> re-use the same response buffer continuously and iterate over it, this
> means that the iteration has to be done in a single threaded manner
> specifically on the thread that did the poll. The work flow will look a bit
> like this:
>
> i) Re-use the same request buffer to create a request and write to the
> socket.
> ii) On poll re-use the same response buffer to read in the request till it
> is complete.
> iii) When the response is complete respond with an iterator to the response
> ByteBuffer. The consumer must now consume the entire ByteBuffer on this
> thread since we use the a single mutable iterator to go through the
> ByteBuffer.
>
> It is tricker when we consider that during iteration the consumer might
> send more kafka requests and call poll further. We can maintain a pointer
> to the end of the response ByteBuffer to allow more responses to stream in
> and essentially use the response buffer as a circular buffer. So basically
> there would be two pointers - iteration_pointer and
>  network_append_pointer. And the iteration_pointer <=
> network_append_pointer.  If the network_append_pointer reaches the end of
> the buffer, then we cannot accept any more data without the
> iteration_pointer having progressed further. Since responses have to be
> continuous, we cannot have a response straddle the circular buffer's end.
> In this case we'd have to detect this case and copy the response to the
> beginning of the buffer so that it is continuous. Though this is a bit more
> complicated, it ensures that we don't have to move in lockstep and can
> pipeline requests/responses. I have written something like this for another
> application and since this is all happening in a single thread it is a bit
> easier and I think possible.
>
> Like Jay suggested if application level deserialization is a bottleneck
> that needs to be solved by passing slices of these ByteBuffers out to a
> pool of threads, then this approach WILL NOT work since we expect the
> ByteBuffer to be linearly iterated in one go. If we want slices to be
> passed to a pool of threads then probably copying to new ByteBuffers is the
> only good option. For my application that is definitely not the case since
> deserialization is free and the cache friendliness of iterating over a hot
> buffer trumps every other factor. But for applications with more involved
> serialization we can re-use the above framework. The more low level
> iterator (discussed in paragraph 2 and 3) can be wrapped in a higher level
> iterator that just copies the bytes for each message to a new ByteBuffer
> and hands them over. The low level iterator is still responsible for buffer
> management and the higher level iterator is just a plain old consumer that
> consumes the bytes by copying them over to a new ByteBuffer and handing
> them to the application. The application is now free to transfer these to
> other threads for processing/deserialization.
>
> Thanks for creating KAFKA-1326. Let me know what your thoughts are on this
> proposed design?
>
> Thanks,
> Rajiv
>
>
> On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Rajiv,
> >
> > Just want to clarify, that the main motivation for iterating over the
> byte
> > buffer directly instead of iterating over the records is for not
> enforcing
> > de-serialization, right? I think that can be done by passing the
> > deserializer class info into the consumer record instead of putting the
> > de-serialized objects into the consumer, and only do de-serialization
> > on-the-fly when key/value fields are requested while exposing another API
> > to return the underlying raw bytes from the record. What do you think?
> >
> > As for de-compression, today we allocate new buffer for each
> de-compressed
> > message, and it may be required to do de-compression with re-useable
> buffer
> > with memory control. I will create a ticket under KAFKA-1326 for that.
> >
> > Guozhang
> >
> > Guozhang
> >
> >
> >
> > On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for the note. So if we do not deserialize till the last moment
> > like
> > > Jay suggested we would not need extra buffers for deserialization.
> Unless
> > > we need random access to messages it seems like we can deserialize
> right
> > at
> > > the time of iteration and allocate objects only if the Consumer
> actually
> > > requires deserialized objects. Some applications like mine can just
> > utilize
> > > the ByteBuffer contents and in those cases we can take in a ByteBuffer
> > or a
> > > ConsumerRecord which points to a ByteBuffer and make it point to the
> > right
> > > slice of our response buffer. This ByteBuffer can be re-used over and
> > over
> > > again.
> > >
> > > As for decompression - that will require an extra buffer allocated at
> > > startup, but not new allocations per response. I imagine the steps
> would
> > be
> > > something like this:
> > >
> > > i) Receive entire response which includes possibly compressed messages
> on
> > > ResponseBuffer. ResponseBuffer is only allocated once.
> > > ii) Once we have received an entire response, return an iterator to the
> > > underlying ByteBuffer.
> > > iii) On each message set check if it is compressed or not.
> > > iv) If compressed decompress only that message set in a streaming
> manner
> > > and stores the result in another preallocated buffer
> > > DecompressedMessagesBuffer. When the consumer asks for the next message
> > > alter the flyweight that it supplies you to point to the correct slice
> in
> > > the DecompressedMessagesBuffer. Do this till all decompressed messages
> > from
> > > that message set are consumed.
> > > v) If the message set was not compressed, then alter the flyweight to
> > point
> > > to the correct slice in the ResponseBuffer. Do this till all messages
> > from
> > > this message set are consumed.
> > >
> > > So basically we will switch between the two buffers when messages are
> > > compressed or otherwise stay only on the ResponseBuffer. With the lazy
> > > approach another nice thing is we can do CRC validation right before
> the
> > > message gets consumed too. Which means that the CRC algorithm will scan
> > > some bytes which will be hot in cache (since we are iterating a linear
> > > array of bytes) and as soon as we have checked the bytes they will be
> > > consumed by the application which means they will stay hot in L1 cache.
> > >
> > > All of this "theory crafting" is based on my currently incomplete
> > > understanding of the kafka protocol but it seems like compression is
> per
> > > message set so we can stream though. Also since in general we can
> iterate
> > > through the response buffer, we can do CRC validation right before the
> > > message is consumed.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > Rajiv,
> > > >
> > > > A side note for re-using ByteBuffer: in the new consumer we do plan
> to
> > > add
> > > > some memory management module such that it will try to reuse
> allocated
> > > > buffer for fetch responses. But as Jay mentioned, for now inside the
> > > poll()
> > > > call de-serialization and de-compression is done which requires to
> > > allocate
> > > > another buffer to write the de-serialized and de-compressed bytes,
> > hence
> > > > even with fetch response buffer management today we still need to
> > > allocate
> > > > new buffers if compressed messages are delivered.
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Zijing, the new consumer will be in the next release. We don't
> have a
> > > > hard
> > > > > date for this yet.
> > > > >
> > > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > > > certainly could be an improvement, but it might also be that the
> CRC
> > > > > validation and compression dominate.
> > > > >
> > > > > The first step would be
> > > > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > > > >
> > > > > This would delay the deserialization of ConsumerRecords and make it
> > > > > basically a wrapper around the actual MemoryRecords chunks which
> are
> > > > > basically ByteBuffer instances. We could then add an optional
> > > > > ConsumerRecords param in poll, to allow you to hand back your
> > > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance
> > reuse
> > > > > would also be possible.
> > > > >
> > > > > That JIRA is actually useful irrespective of these optimizations
> > though
> > > > > because deferring the deserialization and decompression would allow
> > you
> > > > to
> > > > > punt that work into a pool of processor threads. Since it is more
> > > common
> > > > to
> > > > > see the real bottleneck be application serialization this could be
> > > > > valuable.
> > > > >
> > > > > WRT the object reuse I wouldn't be shocked to learn that you
> actually
> > > get
> > > > > equivalent stuff out of the jvm allocator's own pooling and/or
> escape
> > > > > analysis once we are doing the allocation on demand. So it would be
> > > good
> > > > to
> > > > > show a real performance improvement on the newer JVMs before
> deciding
> > > to
> > > > go
> > > > > this route.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <
> ra...@signalfuse.com>
> > > > > wrote:
> > > > >
> > > > > > Just a follow up - I have implemented a pretty hacky prototype
> It's
> > > too
> > > > > > unclean to share right now but I can clean it up if you are
> > > > interested. I
> > > > > > don't think it offers anything that people already don't know
> about
> > > > > though.
> > > > > >
> > > > > > My prototype doesn't do any metadata requests yet but I have a
> > > > flyweight
> > > > > > builder/parser of the FetchRequest and the FetchResponse protocol
> > > that
> > > > I
> > > > > > based on the protocol page at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > > > .
> > > > > > On my consumer thread I allocate two buffers per broker a small
> > > request
> > > > > > buffer and a bigger response buffer. My assumption is that the
> > > response
> > > > > > buffer will be big enough that it is never too small for the
> > > response.
> > > > > If
> > > > > > this assumption doesn't hold then we would have to reallocate a
> > > bigger
> > > > > > buffer. These are the steps I am following right now:
> > > > > >
> > > > > > i) Whenever a client makes a request I fill up the relevant
> request
> > > > > > buffer(s) using my flyweight object instead of allocating an
> object
> > > and
> > > > > > then serializing it. I then write this buffer to the
> SocketChannel
> > > > > > connected to the right Kafka broker. I don't yet handle the case
> > > where
> > > > > all
> > > > > > of the request bytes could not be  written synchronously. If I
> were
> > > to
> > > > > > handle that properly I would need to register WRITE interest on
> the
> > > > > > SocketChannel on incomplete writes.
> > > > > >
> > > > > > ii) Whenever the client calls poll() I register read interest on
> > the
> > > > > socket
> > > > > > channel using the selector and make a select call on the
> underlying
> > > > > > selector. If the selector says that the socket channel is
> > readable, I
> > > > > read
> > > > > > all the bytes possible into my response ByteBuffer. If I can read
> > the
> > > > > first
> > > > > > 4 bytes I know how big the response is. So I wait till all
> response
> > > > bytes
> > > > > > have been read before attempting to parse it. This might take
> > > multiple
> > > > > poll
> > > > > > calls. Once I have received the expected number of bytes I
> iterate
> > > > > through
> > > > > > the response ByteBuffer. There is no random access access
> provided
> > to
> > > > the
> > > > > > ByteBuffer since there are way too many variable length fields.
> > This
> > > a
> > > > > > DISADVANTAGE of doing flyweight style parsing instead of
> > > > deserialization
> > > > > > into POJOs with indexed data structures like maps.  I could build
> > > > > indexing
> > > > > > with a non-allocating primitive hashmap like the koloboke one
> > mapping
> > > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > > > required.
> > > > > > For me the lack of random access has not been a problem at all.
> > > > Iteration
> > > > > > is done just by providing a ByteBuffer and offset (within
> > ByteBuffer)
> > > > > pair
> > > > > > for each message. In my own application I wrap this ByteBuffer
> and
> > > > offset
> > > > > > pair in my own flyweight which knows how to decode the data.
> > > > > >
> > > > > > iii) Once an entire response has been iterated through I can
> re-use
> > > > both
> > > > > > the request as well as response buffers.
> > > > > >
> > > > > > I am sure this can be improved upon a lot.  I allocate the
> > following
> > > > > before
> > > > > > starting my client:
> > > > > >     i) A Direct ByteBuffer for requests.
> > > > > >    ii) A Direct ByteBuffer for responses. Sizes of the
> ByteBuffers
> > > are
> > > > > > chosen so that they can fit the biggest request/responses we
> > expect.
> > > > > >    ii) A flyweight that wraps the Request ByteBuffer and can be
> > used
> > > to
> > > > > > write a particular kind of request.  So far I have only written a
> > > > > flyweight
> > > > > > for a FetchRequest.
> > > > > >   iii) A flyweight that wraps the Response ByteBuffer and can be
> > used
> > > > to
> > > > > > iterate through the entire response including finding errors.
> There
> > > is
> > > > no
> > > > > > random access allowed right now. So far I have only written a
> > > flyweight
> > > > > > parser for a FetchResponse.
> > > > > >   iv) A flyweight for every type of application level message
> that
> > I
> > > > > > expect. Iterating through the response ByteBuffer using the
> > flyweight
> > > > in
> > > > > > (iii) I get offsets into the ByteBuffer for each individual
> > message.
> > > My
> > > > > own
> > > > > > messages work by using absolute position getLong(position),
> > > > > > getShort(position) etc calls on a ByteBuffer, so this works out
> > great
> > > > for
> > > > > > me. We could alternatively provide an API that does allocates a
> new
> > > > > > ByteBuffer and copies the data for people who don't want the zero
> > > > > > allocation access.
> > > > > >
> > > > > > Sadly in my profiling I noticed that selector implementation in
> the
> > > > > > JDK allocates
> > > > > > but it seems like projects like Netty, Aeron have worked around
> > this
> > > by
> > > > > > using reflection to replace the underlying implementation to make
> > it
> > > > > > non-allocating. Other than that I have absolutely zero
> allocations
> > > past
> > > > > the
> > > > > > initial 4-5 allocations. I also have absolutely zero copies in
> user
> > > > space
> > > > > > once the data lands from the socket onto the ByteBuffer.
> > > > > >
> > > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <
> > ra...@signalfuse.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I had a few more thoughts on the new API. Currently we use
> kafka
> > to
> > > > > > > transfer really compact messages - around 25-35 bytes each. Our
> > use
> > > > > case
> > > > > > is
> > > > > > > a lot of messages but each very small. Will it be possible to
> do
> > > the
> > > > > > > following
> > > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> > > employ
> > > > > our
> > > > > > > own binary encoding to encode our messages and it is encoded
> as a
> > > > > > flyweight
> > > > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > > > decoding/deserialization step. Currently we use the
> > SimpleConsumer
> > > > and
> > > > > > have
> > > > > > > found the fact that it hands out ByteBuffers very useful
> instead
> > > of a
> > > > > > > mandatory deserialization into a POJO step. Even then through
> > > memory
> > > > > > > profiling we have found out that the ByteBuffers and the
> records
> > > take
> > > > > > more
> > > > > > > space than the actual messages themselves. Ideally we can
> > allocate
> > > a
> > > > > big
> > > > > > > ByteBuffer (maybe kafka already does it) to receive our data
> and
> > > then
> > > > > > just
> > > > > > > get some kind of a flyweight iterator on the ByteBuffer
> something
> > > > like
> > > > > > this:
> > > > > > >
> > > > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > > > internally.
> > > > > > > Either way it would be very useful to just keep re-using this
> > > buffer.
> > > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > > > // Create a coupe of flyweights.
> > > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > > > // Subscribe to the topics we care about.
> > > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > > > while (running) {
> > > > > > >   // Use the buffer to get data from the server. Additionally
> > > re-use
> > > > > the
> > > > > > > same iterator.
> > > > > > >   // Resets the iterator to point to the start of the
> ByteBuffer.
> > > > > > >   consumer.poll(buffer, iterator, timeout);
> > > > > > >   // Now the iterator has pointers to the ByteBuffer and is
> > capable
> > > > of
> > > > > > > advancing the cursor to read every message.
> > > > > > >   while (iterator.hasNext()) {
> > > > > > >     // The consumer record is just a flyweight over a
> ByteBuffer
> > > and
> > > > is
> > > > > > > adjusted to point to the start of the next record.
> > > > > > >     iterator.getNextInto(record);
> > > > > > >     // This is not a new ByteBuffer, its just the big buffer
> with
> > > its
> > > > > > > position and limit adjusted so that we only read the current
> > > record.
> > > > > > >     // Alternatively we could give it our own ByteBuffer, but
> the
> > > > point
> > > > > > > would be to not do a copy and instead adjust the supplied
> > > > > > >     // ByteBuffer's position and limit and pointer (through
> > > > reflection)
> > > > > > to
> > > > > > > point to the right slice of the actual big ByteBuffer.
> > > > > > >     // This means that we cannot stash this buffer in a hash
> map
> > or
> > > > any
> > > > > > > heap allocated structure since it's contents keep changing as
> we
> > > > > iterate.
> > > > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > > > >     process(buffer);  // Process cannot keep a reference to the
> > > > buffer
> > > > > -
> > > > > > > this is really the programmer's responsibility.
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > Given how the new consumer is meant to be used from a single
> > thread
> > > > - I
> > > > > > > think these optional non-allocating methods will be a great
> boon
> > > for
> > > > > any
> > > > > > > one trying to save memory or prevent heap churn. It has
> exactly 0
> > > > > copies
> > > > > > in
> > > > > > > user space too which is great for performance. In our case
> since
> > > the
> > > > > > > messages are very tiny we end up spending more memory in all
> the
> > > > > wrapper
> > > > > > > objects than in the actual messages so this would be a game
> > changer
> > > > for
> > > > > > us.
> > > > > > > I'd love to be able to contribute if this seems sensible.
> > Hopefully
> > > > > these
> > > > > > > non-allocating methods can co-exist with the allocating ones
> and
> > > only
> > > > > > users
> > > > > > > who absolutely need to use them can make the trade-off  of
> better
> > > > > > > efficiency/performance for a slightly more error-prone and ugly
> > > API.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Rajiv
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > > > <alter...@yahoo.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,The document is very beautiful and the Kafka release
> > > version
> > > > > for
> > > > > > >> this will be? and what is the timeline?
> > > > > > >> ThanksEdwin
> > > > > > >>
> > > > > > >>
> > > > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > > > >> ra...@signalfuse.com> wrote:
> > > > > > >>
> > > > > > >>
> > > > > > >>  Awesome - can't wait for this version to be out!
> > > > > > >>
> > > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <
> > jay.kr...@gmail.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > The timeout in the poll call is more or less the timeout
> used
> > by
> > > > the
> > > > > > >> > selector. So each call to poll will do socket activity on
> any
> > > > ready
> > > > > > >> > sockets, waiting for up to that time for a socket to be
> ready.
> > > > There
> > > > > > is
> > > > > > >> no
> > > > > > >> > longer any background threads involved in the consumer, all
> > > > activity
> > > > > > is
> > > > > > >> > driven by the application thread(s).
> > > > > > >> >
> > > > > > >> > The max fetch request wait time is controlled with a config
> > and
> > > is
> > > > > > >> > independent of the time given to poll.
> > > > > > >> >
> > > > > > >> > -Jay
> > > > > > >> >
> > > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > > > ra...@signalfuse.com>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > I am trying to understand the semantics of the timeout
> > > specified
> > > > > in
> > > > > > >> the
> > > > > > >> > > poll method in
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >> > > .
> > > > > > >> > > Is this timeout a measure of how long the fetch request
> will
> > > be
> > > > > > >> parked on
> > > > > > >> > > the broker waiting for a reply or is this more like the
> > > timeout
> > > > in
> > > > > > >> > > selector.select(long timeout) i.e. the method will return
> > with
> > > > > > >> whatever
> > > > > > >> > > data is there after waiting a maximum of timeout. Exposing
> > the
> > > > > > >> selector
> > > > > > >> > > timeout will be very helpful for us because we want to
> put a
> > > > tight
> > > > > > >> bound
> > > > > > >> > on
> > > > > > >> > > how long we are ready to wait on the poll call. When this
> > API
> > > is
> > > > > > >> > available
> > > > > > >> > > we plan to use a single thread to get data from kafka,
> > process
> > > > > them
> > > > > > as
> > > > > > >> > well
> > > > > > >> > > as run periodic jobs. For the periodic jobs to run we
> need a
> > > > > > >> guarantee on
> > > > > > >> > > how much time the poll call can take at most.
> > > > > > >> > >
> > > > > > >> > > Thanks!
> > > > > > >> > >
> > > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > > > ra...@signalfuse.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Thanks!
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <
> > jay.kr...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Err, here:
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >> > > >>
> > > > > > >> > > >> -Jay
> > > > > > >> > > >>
> > > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > > > jay.kr...@gmail.com>
> > > > > > >> > wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > The current work in progress is documented here:
> > > > > > >> > > >> >
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > > > >> ra...@signalfuse.com
> > > > > > >> > >
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> >> Is there a link to the proposed new consumer
> > > non-blocking
> > > > > API?
> > > > > > >> > > >> >>
> > > > > > >> > > >> >> Thanks,
> > > > > > >> > > >> >> Rajiv
> > > > > > >> > > >> >>
> > > > > > >> > > >> >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to