Interesting, but I think you might run into problems when partitions get rebalanced between processes (not just between threads in the same process).
So, you'd need to coordinate your issue of not committing backward, between threads running on different processes. I'm trying to think now, what might be the worst case if you commit an offset lower than one that was already committed by another thread/process. I guess the danger is only in truly unnecessary duplicate processing? Or is there an actual danger that the system might flail and never move forward!? Are you attempting to add commitOffsetForPartition(partition, offset) yourself, as a mod to the ZookeeperConsumerConnector? One thing that's missing too, is some sort of call back by the ConsumerConnector to notify when a rebalance is happening. This would be useful for trying to make consumer apps a bit more proactive in that case, if not using the default autocommit. Interesting stuff.... Jason On Sat, Nov 23, 2013 at 12:55 AM, Imran Rashid <im...@therashids.com> wrote: > MessageAndMetadata includes the partition and offset along w/ each > message. So this is enough for the workers to track which partitions > they need to update, and by what amount. There is the possibility > that you'd commit an update for a partition that you *used* to own, > but has since be rebalanced elsewhere. But, this is actually OK, as > long as we protect against commits moving the offset backwards. Eg., > we need to prevent Thread A from committing the offset of 60, after B > has already committed an offset of 100. But its fine if Thread A > commits an offset of 60 when Thread B has actually started reading > from 50, and is now up to 70, if B hasn't committed yet. Or even if > Thread B has read up to 59, and it has committed. > > of course its a headache for the user to have to track those offsets > themselves in every worker, but we can provide some tools that do that > automatically, so the user api doesn't need to worry about it. > > I swear I'll have my code ready to look at soon -- I think its coded > but I need to clean it up some ... > > > On Fri, Nov 22, 2013 at 5:38 PM, Jason Rosenberg <j...@squareup.com> wrote: > > I think that the problem is, you don't know which partitions a thread is > > currently 'owning', and therefore, you don't know which partitions you > can > > commit to. > > > > > > On Fri, Nov 22, 2013 at 5:28 PM, Imran Rashid <im...@therashids.com> > wrote: > > > >> I don't think I need control over which partitions are processed by a > >> thread -- however, I do need the partition --> thread assignment to be > >> consistent. I didn't realize that assignment could change, I guess > >> that could mean my solution might not work. It depends a bit on what > >> happens during reassignment. > >> > >> Here's an example: initially Thread A is processing a partition. It > >> commits its read up to offset 50, but keeps reading up till offset 60. > >> At that point, the partition is re-assigned to Thread B (which I guess > >> can be in a completely process). Where will thread B start reading? > >> It seems like it definitely can't start reading at 60 b/c then you > >> can't guarantee that messages 50-60 ever get processed. (Not to > >> mention that it probably doesn't even know A got to 60.) > >> > >> But if B starts at 50, then I don't see how each thread committing > >> their own offsets is a problem. We just need some rule that you can't > >> move an offset backwards on a commit (at least, not without some force > >> option). Without that check, I suppose you could have B read up to > >> 100, commit, and then A commits its read up to 60. (which wouldn't > >> even violate "at least once", but it would certainly be undesirable). > >> > >> The fact that A & B could also be reading from other partitions at the > >> same time doesn't seem like a problem to me, each thread would commit > >> their positions in all of their partitions. > >> > >> what am I overlooking? > >> > >> thanks again for taking the time to work through this with me, I hope > >> this helpful for a broader audience :) > >> > >> imran > >> > >> On Fri, Nov 22, 2013 at 2:37 PM, Jason Rosenberg <j...@squareup.com> > wrote: > >> > With the high-level consumer, the design is that you really don't have > >> > programmatic control over which threads will process which partitions > >> (and > >> > usually multiple per thread). Furthermore, the high-level consumer > can > >> > re-balance and redistribute which threads (and which other processes) > are > >> > consuming which partitions, at any time, etc. > >> > > >> > So, the notion of tracking specific offsets for commit, which only > have > >> > meaning per partition, doesn't really scale with this design. > >> > > >> > Jason > >> > > >> > > >> > On Fri, Nov 22, 2013 at 9:06 AM, Imran Rashid <im...@therashids.com> > >> wrote: > >> > > >> >> hmm, ok, after some more thought I see what you mean.I was thinking > >> >> async because of a more complicated use case -- my app actually > >> >> doesn't process messages one at a time, it builds a batch of > messages, > >> >> merges the batches from all threads, and process those batches, and > >> >> thats when it shoudl commit. > >> >> > >> >> But, let's not worry about batches for the moment -- first lets just > >> >> make sure we can get behavior correct for an app which processes > >> >> messages one at a time. > >> >> > >> >> 1) I agree that synchronous makes more sense in that case. But > >> >> wouldn't that also benefit from commitOffsets(offsets)? that way > each > >> >> thread could commit just its own offsets, completely avoiding the > need > >> >> for locks & stopping other workers. So the code would look like > this: > >> >> > >> >> while(itr.hasNext()) { > >> >> val msg = its.next() > >> >> updateOffsets(msg) //updates some worker state w/ current position > >> >> of this worker in all its partitions > >> >> doSomething(msg) > >> >> if ( shouldCommit) { > >> >> consumer.commitOffsets(myOffsets) //myOffsets would be the set > of > >> >> offsets for *only* this worker > >> >> } > >> >> } > >> >> > >> >> > >> >> (the actual code would be a bit more complicated b/c of handling > >> >> TimeoutExceptions in hasNext(), but still lock-free) > >> >> > >> >> 2) Though async doesn't have any advantages in this case, its not > >> >> *incorrect*, is it? I just want to make sure, for building up to the > >> >> batch case. So instead of directly committing its offsets, the > worker > >> >> would just put its offsets in a queue and keep going. (You can > >> >> probably see where this is going for batches ...) > >> >> > >> >> > >> >> thanks a lot, this discussion has been fascinating! > >> >> > >> >> Imran > >> >> > >> >> > >> >> On Fri, Nov 22, 2013 at 1:06 AM, Jason Rosenberg <j...@squareup.com> > >> wrote: > >> >> > Again, > >> >> > > >> >> > Remember that if you have multiple threads processing, then it > means > >> they > >> >> > are consuming from different partitions, so the offset of one > >> partition > >> >> > will not be relevant to another. So, you really wouldn't have a > use > >> case > >> >> > to be committing specific offsets async. Furthermore, if your > >> consumer > >> >> is > >> >> > using a filter to select multiple topics, then even a single > thread is > >> >> > likely going to be consuming multiple topics, so again, a single > >> offset > >> >> > commit wouldn't make much sense. > >> >> > > >> >> > Jason > >> >> > > >> >> > > >> >> > On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid < > im...@therashids.com> > >> >> wrote: > >> >> > > >> >> >> Hi Jason, > >> >> >> > >> >> >> thank you so much! > >> >> >> > >> >> >> I was missing the consumer.timeout.ms property. In fact I had > >> >> >> actually written a full solution using cyclic barriers to make > sure > >> >> >> all my readers had stopped consuming ... but threw it out b/c I > >> >> >> thought I couldn't deal w/ iterators blocked in hasNext(). That > >> >> >> property is just what I was missing, I never would have found it > >> w/out > >> >> >> your help. > >> >> >> > >> >> >> Its also great to get some confirmation that this is a a real > issue > >> -- > >> >> >> I feel like this should be mentioned in the docs somewhere and > >> ideally > >> >> >> hidden behind a clean api. > >> >> >> > >> >> >> I completely agree I can do this w/ the synchronous "shouldCommit" > >> >> >> given the current api. But I'm not sure that is the "right" way. > If > >> I > >> >> >> just had a one-arg commitOffsets(someOffsets) method, then I think > >> >> >> there is a way to do this async that is both cleaner and should > have > >> >> >> higher throughput (no need to have a full barrier that stops all > the > >> >> >> workers simultaneously). I've been getting influenced by the akka > >> way > >> >> >> of thinking. I think I will be able to share some example code > >> >> >> tomorrow, maybe you and others can take a look and see if you > think > >> >> >> its an improvement or not. > >> >> >> > >> >> >> thanks again! > >> >> >> imran > >> >> >> > >> >> >> > >> >> >> On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg < > j...@squareup.com> > >> >> wrote: > >> >> >> > Hi Imran, > >> >> >> > > >> >> >> > The thing to do, is not have an asynchronous background thread > for > >> >> >> > committing. Instead have a time based "shouldCommit()" > function, > >> and > >> >> >> > commit periodically, synchronously. > >> >> >> > > >> >> >> > If you have a bursty producer, then you can set a timeout ( > >> >> >> > consumer.timeout.ms), so that your iter.hasNext() call will > >> >> periodically > >> >> >> > wake up and throw a ConsumerTimeoutException. You can catch > this > >> >> >> exception > >> >> >> > silently, and check your "shouldCommit()" predicate. > >> >> >> > > >> >> >> > If you have multiple threads, you can use cyclic barrier, or > some > >> >> such, > >> >> >> > make sure all threads have paused before doing the synchronous > >> commit. > >> >> >> In > >> >> >> > this case, you should set your timeout pretty low, say 100ms, so > >> that > >> >> >> > threads with work to do aren't waiting for quiescent threads to > >> wake > >> >> up > >> >> >> and > >> >> >> > timeout already. > >> >> >> > > >> >> >> > I have a configuration like this. I use a default commit cycle > of > >> 60 > >> >> >> > seconds, and use 100ms timeout on the consumer iterator. > >> >> >> > > >> >> >> > The auto-commit mode built in has the fatal flaw, as you have > >> pointed > >> >> >> out, > >> >> >> > of possibly dropping a single message on a restart. > >> >> >> > > >> >> >> > Jason > >> >> >> > > >> >> >> > > >> >> >> > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid < > >> im...@therashids.com> > >> >> >> wrote: > >> >> >> > > >> >> >> >> HI Benjamin, > >> >> >> >> > >> >> >> >> great question, I thought that for a while too. I think that > >> won't > >> >> >> >> work, in my use case, but I could certainly use somebody else > >> >> checking > >> >> >> >> my logic. > >> >> >> >> > >> >> >> >> I think that would work, *if* I was willing to allow arbitrary > >> time > >> >> to > >> >> >> >> pass between my commits. Then I could call commitOffsets > within > >> the > >> >> >> >> while loop like so: > >> >> >> >> while(itr.hasNext()) { > >> >> >> >> val msg = itr.next() > >> >> >> >> doSomething(msg) > >> >> >> >> if(shouldICommit) { > >> >> >> >> consumer.commitOffsets > >> >> >> >> } > >> >> >> >> } > >> >> >> >> > >> >> >> >> (this is what you meant, right?) > >> >> >> >> > >> >> >> >> however, its.hasNext() might block an arbitrary amount of time. > >> The > >> >> >> >> producers for my queues are very bursty -- they will create a > ton > >> of > >> >> >> >> messages, and then go silent for a long time (perhaps days). > >> Since I > >> >> >> >> do want to ensure commits are happening regularly, that means > its > >> got > >> >> >> >> to be triggered by another thread (something that can't get > >> blocked > >> >> at > >> >> >> >> hasNext()). Which is why I've got make sure that trigger > doesn't > >> >> >> >> happen between next() and doSomething(). > >> >> >> >> > >> >> >> >> a second point (much more minor) is that calling > >> >> >> >> consumer.commitOffsets commits the offsets for all partitions > that > >> >> >> >> consumer is reading. Which means I've got to create a separate > >> >> >> >> consumer per thread. (But this is really minor, just a > nuisance, > >> >> >> >> doesn't effect correctness.) > >> >> >> >> > >> >> >> >> You might say that even my first point is just a nuisance, and > I > >> just > >> >> >> >> need to live with it. Then (1) I'd just like to make sure my > >> >> >> >> understanding is correct -- I think this behavior is really not > >> >> >> >> obvious, so its worth clarifying and (2) I'd like to confirm > that > >> I > >> >> >> >> have a workaround and (3) if this problem does exist, and the > >> >> >> >> workaround is tricky, then maybe the api change I'm proposing > >> would > >> >> be > >> >> >> >> good idea? > >> >> >> >> > >> >> >> >> thanks for reading about this and helping me work through it! > >> >> >> >> > >> >> >> >> imran > >> >> >> >> > >> >> >> >> > >> >> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> > wrote: > >> >> >> >> > why not just disable autocommit and only call commit > offsets() > >> >> after > >> >> >> >> you've > >> >> >> >> > processed a batch? it isn't obvious to me how doing so would > >> allow > >> >> a > >> >> >> >> > message to be processed zero times. > >> >> >> >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" < > im...@therashids.com> > >> >> wrote: > >> >> >> >> > > >> >> >> >> >> Hi Edward, > >> >> >> >> >> > >> >> >> >> >> I think you misunderstand ... I definitely do *not* want to > >> commit > >> >> >> >> >> every message. That would be much too expensive, every few > >> >> minutes > >> >> >> is > >> >> >> >> >> plenty for me. > >> >> >> >> >> > >> >> >> >> >> I want to guarantee that if I commit a message, that my > message > >> >> has > >> >> >> >> >> been processed by *my application* at least once. (In > another > >> >> thread > >> >> >> >> >> I mentioned something about reading exactly once, but that > is > >> not > >> >> >> what > >> >> >> >> >> I am going for here -- more than once is OK, as long as its > at > >> >> least > >> >> >> >> >> once.) That requires me to make sure that a commit never > >> happens > >> >> >> >> >> between a call to next() and doSomething(). Kafka > guarantees > >> that > >> >> >> >> >> messages get read off the queue at least once, but as I > >> outlined > >> >> >> >> >> above, a naive solution allows for messages to get read off > the > >> >> >> queue, > >> >> >> >> >> but not make it into my app. > >> >> >> >> >> > >> >> >> >> >> Not only do I not want to commit every message -- I'd really > >> like > >> >> to > >> >> >> >> >> have the above guarantee without even acquiring a lock > (most of > >> >> the > >> >> >> >> >> time). That is what I was getting at with the comment about > >> the > >> >> >> >> >> spin-lock, just as an idea of how to prevent a commit > between > >> >> next() > >> >> >> >> >> and doSomething(). I dunno if the spin-lock was really the > >> right > >> >> >> >> >> idea, that was just a random thought, but the point is, we > want > >> >> some > >> >> >> >> >> check that should be very cheap for 99.999% of the time > when a > >> >> commit > >> >> >> >> >> isn't happening, but can still guarantee proper ordering > during > >> >> that > >> >> >> >> >> 0.001% of the time when a commit is happening. otherwise, > >> >> messages > >> >> >> >> >> might never make it to my app. (And this is all just to > >> prevent a > >> >> >> >> >> message getting lost during the 10^-6% of the time when a > >> commit > >> >> >> might > >> >> >> >> >> happen between next() and doSomething(), and the app dies > >> before > >> >> >> >> >> doSomething completes!) > >> >> >> >> >> > >> >> >> >> >> Actually after a bit more thought -- the best way to > guarantee > >> >> this > >> >> >> >> >> would be with a small api change, that would do away with > the > >> need > >> >> >> for > >> >> >> >> >> locks completely. The iterator should have a method > >> applyToNext: > >> >> >> >> >> > >> >> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) { > >> >> >> >> >> //all the stuff in next, *except* > >> >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> >> >> >> val item = ... > >> >> >> >> >> f(item) > >> >> >> >> >> //after we've applied the users function, now we can > update > >> the > >> >> >> >> >> offset that should get committed > >> >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> >> >> >> } > >> >> >> >> >> > >> >> >> >> >> I think this way, you could avoid any problems even with > >> >> auto-commit. > >> >> >> >> >> Or, if you don't want to add a new method, so we can stick > to > >> the > >> >> >> >> >> Iterator api, then maybe the iterator could let you > register a > >> >> >> >> >> preCommitFunction, so next() would change to: > >> >> >> >> >> > >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> >> >> >> ... > >> >> >> >> >> val item = ... > >> >> >> >> >> preCommitFuntions.foreach{f => f(item)} > >> >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> >> >> >> ... > >> >> >> >> >> item > >> >> >> >> >> } > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> thanks, > >> >> >> >> >> Imran > >> >> >> >> >> > >> >> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo < > >> >> >> edlinuxg...@gmail.com > >> >> >> >> > > >> >> >> >> >> wrote: > >> >> >> >> >> > You likely need to use a custom offset solution if you > plan > >> on > >> >> >> >> committing > >> >> >> >> >> > every message. With many partitions this puts a large > burden > >> on > >> >> >> >> >> Zookeeper, > >> >> >> >> >> > you end up needing to roll over your zk transaction logs > >> fast as > >> >> >> well > >> >> >> >> or > >> >> >> >> >> > rist filling up disk > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang < > >> >> wangg...@gmail.com > >> >> >> > > >> >> >> >> >> wrote: > >> >> >> >> >> > > >> >> >> >> >> >> Hello Imran, > >> >> >> >> >> >> > >> >> >> >> >> >> The offset will only be updated when the next() function > is > >> >> >> called: > >> >> >> >> >> >> > >> >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> >> >> >> >> ... > >> >> >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> >> >> >> >> ... > >> >> >> >> >> >> item > >> >> >> >> >> >> } > >> >> >> >> >> >> > >> >> >> >> >> >> instead of in makeNext(), which will just update > >> >> consumedOffset, > >> >> >> but > >> >> >> >> >> that > >> >> >> >> >> >> is not the value that will be committed using the > >> commitOffset > >> >> >> call. > >> >> >> >> So > >> >> >> >> >> as > >> >> >> >> >> >> long as you turn of auto commit and only call > commitOffset > >> >> after > >> >> >> the > >> >> >> >> >> >> process(msg) call, not after the > >> >> >> >> >> >> > >> >> >> >> >> >> b = iter.next() > >> >> >> >> >> >> > >> >> >> >> >> >> is called, at-least-once is guaranteed. > >> >> >> >> >> >> > >> >> >> >> >> >> Does that make sense? > >> >> >> >> >> >> > >> >> >> >> >> >> Guozhang > >> >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid < > >> >> >> im...@therashids.com> > >> >> >> >> >> >> wrote: > >> >> >> >> >> >> > >> >> >> >> >> >> > sorry to keep bugging the list, but I feel like I am > >> either > >> >> >> missing > >> >> >> >> >> >> > something important, or I'm finding something wrong w/ > the > >> >> >> standard > >> >> >> >> >> >> > consumer api, (or maybe just the docs need some > >> >> clarification). > >> >> >> >> >> >> > > >> >> >> >> >> >> > I started to think that I should probably just accept > at > >> >> least > >> >> >> once > >> >> >> >> >> >> > semantics ... but I eventually realized that I'm not > even > >> >> sure > >> >> >> we > >> >> >> >> >> >> > really get an at least once guarantee. I think it > really > >> >> might > >> >> >> be > >> >> >> >> >> >> > zero-or-more. Or rather, messages will get pulled off > the > >> >> kafka > >> >> >> >> queue > >> >> >> >> >> >> > at least once. but that doesn't mean your app will > >> actually > >> >> >> >> *process* > >> >> >> >> >> >> > those messages at least once -- there might be > messages it > >> >> never > >> >> >> >> >> >> > processes. > >> >> >> >> >> >> > > >> >> >> >> >> >> > Consider a really basic reader of a kafka queue: > >> >> >> >> >> >> > > >> >> >> >> >> >> > while(it.hasNext()){ > >> >> >> >> >> >> > val msg = it.next() > >> >> >> >> >> >> > doSomething(msg) > >> >> >> >> >> >> > } > >> >> >> >> >> >> > > >> >> >> >> >> >> > the question is, do I have any guarantees on how many > >> times > >> >> >> >> >> >> > doSomething() is called on everything in the queue? I > >> think > >> >> the > >> >> >> >> >> >> > "guarantee" is: > >> >> >> >> >> >> > 1) most messages will get processed excatly once > >> >> >> >> >> >> > 2) around a restart, a chunk of msgs will get > processed at > >> >> least > >> >> >> >> once, > >> >> >> >> >> >> > but probably more than once > >> >> >> >> >> >> > 3) around a restart, it is possible that one message > will > >> get > >> >> >> >> >> >> > processed ZERO times > >> >> >> >> >> >> > > >> >> >> >> >> >> > (1) & (2) are probably clear, so lemme explain how I > think > >> >> (3) > >> >> >> >> could > >> >> >> >> >> >> > happen. Lets imagine messages a,b,c,... and two > threads, > >> one > >> >> >> >> reading > >> >> >> >> >> >> > from the stream, and one thread that periodically > commits > >> the > >> >> >> >> offsets. > >> >> >> >> >> >> > Imagine this sequence of events: > >> >> >> >> >> >> > > >> >> >> >> >> >> > > >> >> >> >> >> >> > ==Reader== > >> >> >> >> >> >> > -initializes w/ offset pointing to "a" > >> >> >> >> >> >> > > >> >> >> >> >> >> > -hasNext() > >> >> >> >> >> >> > ---> makeNext() will read "a" > >> >> >> >> >> >> > and update the local offset to "b" > >> >> >> >> >> >> > > >> >> >> >> >> >> > -msg = "a" > >> >> >> >> >> >> > > >> >> >> >> >> >> > -doSomething("a") > >> >> >> >> >> >> > > >> >> >> >> >> >> > -hasNext() > >> >> >> >> >> >> > ----> makeNext() will read "b" > >> >> >> >> >> >> > and update the local offset "c" > >> >> >> >> >> >> > > >> >> >> >> >> >> > ==Commiter== > >> >> >> >> >> >> > > >> >> >> >> >> >> > -commitOffsets stores the current offset as "c" > >> >> >> >> >> >> > > >> >> >> >> >> >> > > >> >> >> >> >> >> > > >> >> >> >> >> >> > =====PROCESS DIES===== > >> >> >> >> >> >> > ===== RESTARTS ===== > >> >> >> >> >> >> > > >> >> >> >> >> >> > ==Reader== > >> >> >> >> >> >> > -initializes w/ offset pointing to "c" > >> >> >> >> >> >> > > >> >> >> >> >> >> > -hasNext() > >> >> >> >> >> >> > --> makeNext() will read "c" > >> >> >> >> >> >> > and update local offset to "d" > >> >> >> >> >> >> > -msg = "c" > >> >> >> >> >> >> > -doSomething("c") > >> >> >> >> >> >> > ... > >> >> >> >> >> >> > > >> >> >> >> >> >> > > >> >> >> >> >> >> > > >> >> >> >> >> >> > note that in this scenario, doSomething("b") was never > >> >> called. > >> >> >> >> >> >> > Probably for a lot of applications this doesn't matter. > >> But > >> >> >> seems > >> >> >> >> >> >> > like it this could be terrible for some apps. I can't > >> think > >> >> of > >> >> >> any > >> >> >> >> >> >> > way of preventing it from user code. unless, maybe > when > >> the > >> >> >> >> offsets > >> >> >> >> >> >> > get committed, it is always *before* the last thing > read? > >> >> eg., > >> >> >> in > >> >> >> >> my > >> >> >> >> >> >> > example, it would store the next offset as "b" or > earlier? > >> >> >> >> >> >> > > >> >> >> >> >> >> > Is there a flaw in my logic? Do committed offsets > always > >> >> >> >> "undershoot" > >> >> >> >> >> >> > to prevent this? > >> >> >> >> >> >> > > >> >> >> >> >> >> > thanks, > >> >> >> >> >> >> > Imran > >> >> >> >> >> >> > > >> >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> -- > >> >> >> >> >> >> -- Guozhang > >> >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> >