In that case my confusion lies with how or why adding support for an
iterator helps Samza resolve its memory issues and the consistency
questions are about how a Runner is choosing to parallelize execution of a
straight-line program. For example a runner may choose to use optimistic
locking and actually process two concurrent bundles at the same time for
the same key and window. If at most one of them does a write, both bundles
can be successfully committed. If Samza does this then it may not be able
to free snapshots within a bundle because it may not be able to emulate
straight-line program order consistency. If it always processes key+window
pairs serially, then I'm not sure why Samza needs to use a snapshot in the
first place and should be able to read data from RocksDb directly.

The other point of confusion is around how is a readIterator() any
different then returning a wrapper that just invokes read().iterator() on a
BagState. If that is the case then this is a trivial change which doesn't
need to change the portability design. The nuance is likely that the
interface between what a Runner implements to provide support for user
state may give itself more visibility into what is going on then what the
portability framework can provide as is.

On Tue, May 15, 2018 at 8:57 AM Kenneth Knowles <[email protected]> wrote:

> OK, got it. But what consistency are you referring to? I was trying to
> point out that there's nothing but straight-line program order consistency.
> There's only one actor doing all the reads and all the writes.
>
> Kenn
>
> On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <[email protected]> wrote:
>
>> I misspoke when I said portability semantics and should have said
>> portability design/implementation. This is why I had a follow-up e-mail and
>> clarified that I'm confused on:
>> * I don't understand how you would want close to change the semantics of
>> a user state specification and how it affects the lifetime of user state?
>> ** Does it represent committing information within a bundle?
>> ** Does it mean that user state can ignore the replayable and consistent
>> semantics for a lifetime of a bundle semantics?
>>
>> I'm trying to tie back what does a `ReadableState<Iterator>
>> readIterator()` means for Runner authors and how it solves the
>> memory/close() problem for Samza. Based upon
>> https://s.apache.org/beam-state, reading and writing of state must be
>> consistent. Does this mean that Samza must use snapshots for the lifetime
>> of a bundle? If so, I don't see how adding a `ReadableState<Iterator>
>> readIterator()` allows Samza to ignore the consistency requirement and be
>> allowed to free snapshots.
>>
>> It might be worthwhile to setup a three way hangouts call to help me as I
>> don't have the same level of context which can be shared back to this
>> thread. Xinyu / Kenn, how about we setup a time using Slack?
>>
>> On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> I feel like this discussion is kind of far from the primary intention.
>>> The point of ParDo(stateful DoFn) is to enable naive single-threaded code
>>> in a style intuitive to a beginning imperative programmer. So:
>>>
>>>  - the return value of read() should act like an immutable value
>>>  - if there is a read after a write, that read should reflect the
>>> changes written, because there's a trivial happens-before relationship
>>> induced by program order
>>>  - there are no non-trivial happens-before relationships
>>>  - a write after a read should not affect the value read before
>>>
>>> From that starting point, the limitations on expressiveness (single
>>> threaded per-key-and-window-and-step) give rise to embarrassingly parallel
>>> computation and GC.
>>>
>>> So when you say "portability semantics" it makes me concerned. The word
>>> "semantics" refers to the mapping between what a user writes and what it
>>> means. Primarily, that means the conceptual translation between a primitive
>>> PTransform and the corresponding PCollection-to-PCollection operation. It
>>> is true that portability complicates things because a user's code can only
>>> be given meaning relative to an SDK harness. But the end-user intention is
>>> unchanged. If we had time and resources to give the Fn API a solid spec, it
>>> should be such that the SDK harness has little choice but to implement the
>>> primitives as intended.
>>>
>>> In other words, it is the job of
>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing* to
>>> implement https://s.apache.org/beam-state. The discussion of adding
>>> `ReadableState<Iterator> readIterator()` to BagState seems consistent with
>>> the latter.
>>>
>>> Kenn
>>>
>>> *IIUC you've chosen to use the same underlying proto service for side
>>> inputs and by-reference values. That's an implementation detail that I have
>>> no particular opinion about except that if it complicates implementing the
>>> primary motivator for state, it may not be a great fit.
>>>
>>>
>>>
>>> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> I don't follow why allowing freeing resources would be counter to the
>>>> spec. I don't really know what you mean by consistent for a bundle. State,
>>>> in the sense of the user-facing per-key-and-window state API, is single
>>>> threaded and scoped to a single DoFn. There's no one else who can write the
>>>> state. If a BagState is read and written and read again, the user-facing
>>>> logic should be unaware of the resources and not have any logic to deal
>>>> with consistency.
>>>>
>>>> Kenn
>>>>
>>>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Hmm, some of the problem I'm dealing with is:
>>>>> * I don't understand how you would want close to change the semantics
>>>>> of a user state specification and how it affects the lifetime of user 
>>>>> state?
>>>>> ** Does it represent committing information within a bundle?
>>>>> ** Does it mean that user state can ignore the replayable and
>>>>> consistent semantics for a lifetime of a bundle semantics?
>>>>> * I do understand that close semantics may make it easier for Samza
>>>>> runner to support state that is greater then memory, but what does it
>>>>> provide to users and how would they benefit (like what new scenarios would
>>>>> it support)?
>>>>> * I don't understand in RocksDb why you need a snapshot in the first
>>>>> place, and whether RocksDb can support seeking inside or outside a 
>>>>> snapshot
>>>>> relatively efficiently?
>>>>>
>>>>> There seems to be some context lost initially with a discussion that
>>>>> you and Kenn had, is there a copy of that which could be shared? May help
>>>>> me get up to speed on to the problem that is being solved.
>>>>>
>>>>>
>>>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I will take a look at the docs to understand the problem better. A
>>>>>> minor comment to 2) is that I don't intend to change the existing 
>>>>>> iterable
>>>>>> API. I plan to implement it similar to Flink, loading the data into 
>>>>>> memory
>>>>>> and closing the underlying snapshot after that. So the changes should be
>>>>>> backward compatible.
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I believe adding support for a state spec to be 'closed' or 'freed'
>>>>>>> is counter to the requirement of a state spec being consistent for the
>>>>>>> lifetime of a bundle, are we willing to change this requirement for the
>>>>>>> lifetime of a bundle or say that runners can arbitrary say that a 
>>>>>>> StateSpec
>>>>>>> can't be accessed anymore?
>>>>>>>
>>>>>>> If not, I'm having trouble of thinking of a good way on how to
>>>>>>> integrate the 'close/free' API with portability semantics because:
>>>>>>> 1) Runners do control bundle size but other then the trivial cases
>>>>>>> where a runner specifically chooses:
>>>>>>>   a) to only process a single element at a time (negating the need
>>>>>>> for a free/close() method).
>>>>>>>   b) optimistically process elements and if you run out of memory
>>>>>>> kill the bundle and try again with a smaller bundle.
>>>>>>> 2) Adding an API to provide Iterators doesn't mean that users can't
>>>>>>> still use iterable which can't be force closed. (Dropping Iterable in
>>>>>>> exchange for Iterator in the API is backwards incompatible so likely to 
>>>>>>> be
>>>>>>> deferred till the next major version of Apache Beam).
>>>>>>> 3) SDKs are able to process as many elements as they want in
>>>>>>> parallel, nothing requires them to execute elements serially throughout 
>>>>>>> the
>>>>>>> pipeline graph.
>>>>>>> 4) The runner has limited information into what an SDK is doing. The
>>>>>>> SDK provides a lower bound on how many elements it has processed but 
>>>>>>> SDKs
>>>>>>> aren't required to implement this meaning that they could technically be
>>>>>>> processing everything in random order after they have seen all input 
>>>>>>> for a
>>>>>>> bundle.
>>>>>>>
>>>>>>> We'll need to work through the scenarios, Xinyu it would be useful
>>>>>>> for you to take a look at these two docs for context into the problem 
>>>>>>> space:
>>>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to
>>>>>>> process a bundle)
>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How
>>>>>>> to access side inputs, access remote references, and support user state)
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Before you go on and update the user facing API, we should discuss
>>>>>>>>> the last point I made since the change your making will have limited
>>>>>>>>> usability since the portability effort won't realistically allow you 
>>>>>>>>> to see
>>>>>>>>> such low level things like when processElement finished and 
>>>>>>>>> supporting user
>>>>>>>>> state will be modeled using the following three operations: read (with
>>>>>>>>> continuation tokens), append (blind write), clear. It may be moot to
>>>>>>>>> discuss how to integrate Samza into the existing framework and should 
>>>>>>>>> work
>>>>>>>>> towards being a portability based runner. There are more details here 
>>>>>>>>> about
>>>>>>>>> supporting user state during pipeline execution:
>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>>>>> level docs about portability are
>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>
>>>>>>>>> Do we really want to make a change to the user facing API for user
>>>>>>>>> state in the Java SDK when that code path will be phased out in 
>>>>>>>>> exchange
>>>>>>>>> for using the portability APIs?
>>>>>>>>>
>>>>>>>>
>>>>>>>> To be clear, the user API will remain the same, but the runner's
>>>>>>>> implementation path will be phased out. So should we expand this 
>>>>>>>> discussion
>>>>>>>> to how the portability APIs enable the SDK and runner to collaborate to
>>>>>>>> achieve this use case? It seems like the interaction you need is that 
>>>>>>>> the
>>>>>>>> runner can tell that the SDK can closed the connection on the read(), 
>>>>>>>> and
>>>>>>>> the SDK needs to do so promptly, right?
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> We discussed internally about the proposed approaches. Seems if
>>>>>>>>>> the State API can also expose another method to return a 
>>>>>>>>>> ReadableState<Iterator>,
>>>>>>>>>> it will cover our cases of iterating over a bigger-then-memory 
>>>>>>>>>> state, and
>>>>>>>>>> closing the underlying rocksDb snapshot immediately after the 
>>>>>>>>>> iterator is
>>>>>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>>>>>> iterators after proessElement (minor cases). If this sounds good to 
>>>>>>>>>> you
>>>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it.
>>>>>>>>>> Thanks a lot for the discussions here.
>>>>>>>>>>
>>>>>>>>>> The discussions about Async processing is very interesting, and I
>>>>>>>>>> think it's definitely worth its own thread. I believe we do need the
>>>>>>>>>> support from the Beam API/model so the users can take advantage of it
>>>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in 
>>>>>>>>>> the
>>>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying
>>>>>>>>>> framework, but it's going to be great for the users to do remote IO. 
>>>>>>>>>> In
>>>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>>>>>> mentioned
>>>>>>>>>> above. I am not sure whether this feature can be runner-specific 
>>>>>>>>>> support
>>>>>>>>>> thing. I will probably create another email thread for this 
>>>>>>>>>> discussion in
>>>>>>>>>> the future and hopefully I can move this forward.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <[email protected]
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Users typically want to do that async operation and then
>>>>>>>>>>>> produce output with it. Adding asynchronous execution is difficult 
>>>>>>>>>>>> within
>>>>>>>>>>>> the framework because a lot of code is currently not needed to be 
>>>>>>>>>>>> thread
>>>>>>>>>>>> safe and writing code to be fast and handle asynchronous execution 
>>>>>>>>>>>> is quite
>>>>>>>>>>>> difficult. Adding async operations typically leads to code with 
>>>>>>>>>>>> too many
>>>>>>>>>>>> locks/synchronization blocks.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Just a small point here, as it is very unusual to use locks with
>>>>>>>>>>> futures. The essential innovation of futures is that it is a new 
>>>>>>>>>>> way to
>>>>>>>>>>> program that avoids threads+locks style. In part, you get this for 
>>>>>>>>>>> free
>>>>>>>>>>> from functional programming style, and on the other hand you reify
>>>>>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>>>>>> visibility into such low level things like when an object is 
>>>>>>>>>>>> garbage
>>>>>>>>>>>> collected and supporting user state will be modeled using the 
>>>>>>>>>>>> following
>>>>>>>>>>>> three operations: read (with continuation tokens), append (blind 
>>>>>>>>>>>> write),
>>>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the 
>>>>>>>>>>>> existing
>>>>>>>>>>>> framework and should work towards being a portability based 
>>>>>>>>>>>> runner. There
>>>>>>>>>>>> are more details here about supporting user state during pipeline
>>>>>>>>>>>> execution:
>>>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing,
>>>>>>>>>>>> top level docs about portability are
>>>>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> At least one API that has been discussed in the past, is to
>>>>>>>>>>>>> use Java 8 CompletionStage. e.g.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>>>> element, ...) {
>>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>> The framework will automatically create the CompletionStage,
>>>>>>>>>>>>> and the process method can specify a pipeline of asynchronous 
>>>>>>>>>>>>> operations to
>>>>>>>>>>>>> perform on the element. When all of them are done, the element 
>>>>>>>>>>>>> will be
>>>>>>>>>>>>> marked as successfully processed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Reuven
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion
>>>>>>>>>>>>>> over BEAM-2975 and BEAM-2980 about having snapshot or live
>>>>>>>>>>>>>> views of iterable, and the current semantics makes a lot of 
>>>>>>>>>>>>>> sense to me.
>>>>>>>>>>>>>> For your question: it does not require an explicit snapshot when 
>>>>>>>>>>>>>> we create
>>>>>>>>>>>>>> RocksDb iterator directly. The iterator will read from an 
>>>>>>>>>>>>>> implicit snapshot
>>>>>>>>>>>>>> as of the time the iterator is created [1], and the snapshot 
>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>> released after the iterator is closed. If we can have another 
>>>>>>>>>>>>>> method to
>>>>>>>>>>>>>> return ReadableState<Iterator>, we might be able to apply the 
>>>>>>>>>>>>>> auto-closing
>>>>>>>>>>>>>> approaches as we discussed and solve the problem here :).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's very interesting that you bring up the discussion about
>>>>>>>>>>>>>> async API! Async IO has been widely adopted here among our 
>>>>>>>>>>>>>> users: they use
>>>>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help 
>>>>>>>>>>>>>> manage the
>>>>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in 
>>>>>>>>>>>>>> which the user
>>>>>>>>>>>>>> will invoke the callback after the remote calls are complete. 
>>>>>>>>>>>>>> Currently in
>>>>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for 
>>>>>>>>>>>>>> remote IO. Seems
>>>>>>>>>>>>>> we might have to do blocking calls (thus the poor resource 
>>>>>>>>>>>>>> utilization you
>>>>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you 
>>>>>>>>>>>>>> can send a
>>>>>>>>>>>>>> few more details about the discussion about async API. I would 
>>>>>>>>>>>>>> like to add
>>>>>>>>>>>>>> our use case and help move this forward.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>>>>>> [3]:
>>>>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't have any further suggestions, but want to call out
>>>>>>>>>>>>>>> how this hits a lot of interesting points.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1]
>>>>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the 
>>>>>>>>>>>>>>> strongest case
>>>>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps 
>>>>>>>>>>>>>>> they should
>>>>>>>>>>>>>>> both be closed as fixed...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>>>>>> compensated with
>>>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor
>>>>>>>>>>>>>>> utilization). The alternative is a more future-istic API where 
>>>>>>>>>>>>>>> the calls
>>>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@
>>>>>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two 
>>>>>>>>>>>>>>> methods with
>>>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. 
>>>>>>>>>>>>>>> But wouldn't
>>>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb 
>>>>>>>>>>>>>>> iterator
>>>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can 
>>>>>>>>>>>>>>> tell, I don't
>>>>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I
>>>>>>>>>>>>>>>> posted the question here, we discussed internally about 
>>>>>>>>>>>>>>>> releasing the
>>>>>>>>>>>>>>>> underlying resources after consuming the whole iterator. This 
>>>>>>>>>>>>>>>> probably
>>>>>>>>>>>>>>>> covers quite a lot of use cases. For some special cases that 
>>>>>>>>>>>>>>>> the user only
>>>>>>>>>>>>>>>> consume part of the iterator, Luke and Kenn's suggestion about 
>>>>>>>>>>>>>>>> releasing
>>>>>>>>>>>>>>>> after processElement() might work (I need to confirm about 
>>>>>>>>>>>>>>>> this with our
>>>>>>>>>>>>>>>> use cases). So based on what we discussed so far, we might 
>>>>>>>>>>>>>>>> have a good way
>>>>>>>>>>>>>>>> to automatically close an iterator for the store.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> There is another issue though: right now the state API
>>>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and we 
>>>>>>>>>>>>>>>> can create
>>>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds a 
>>>>>>>>>>>>>>>> snapshot of
>>>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be a 
>>>>>>>>>>>>>>>> db.snapshot().
>>>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator 
>>>>>>>>>>>>>>>> where we can
>>>>>>>>>>>>>>>> use some heuristics to automatically release it. The user can 
>>>>>>>>>>>>>>>> hold on to
>>>>>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>>>>>> processElement().
>>>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am 
>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>> concerned about the limitations this will bring. If the user 
>>>>>>>>>>>>>>>> is doing some
>>>>>>>>>>>>>>>> remote call during the process, then the snapshot might be 
>>>>>>>>>>>>>>>> held for a long
>>>>>>>>>>>>>>>> time before releasing, and might cause performance problems. 
>>>>>>>>>>>>>>>> And if the
>>>>>>>>>>>>>>>> user happen to create multiple iterables, then there will be 
>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being 
>>>>>>>>>>>>>>>> aggressive at closing
>>>>>>>>>>>>>>>> the resources and recreating when needed again. But in this 
>>>>>>>>>>>>>>>> case it might
>>>>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot 
>>>>>>>>>>>>>>>> given the
>>>>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot 
>>>>>>>>>>>>>>>> is not cheap
>>>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the 
>>>>>>>>>>>>>>>> iterator itself
>>>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in 
>>>>>>>>>>>>>>>> some sort of
>>>>>>>>>>>>>>>> StateResource in the fear that people might reject the 
>>>>>>>>>>>>>>>> proposal immediately
>>>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users 
>>>>>>>>>>>>>>>> are familiar
>>>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>>>>>> using it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that
>>>>>>>>>>>>>>>>> resources are freed for Java developers (hence the 
>>>>>>>>>>>>>>>>> weak/phantom reference
>>>>>>>>>>>>>>>>> suggestion).
>>>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like
>>>>>>>>>>>>>>>>> file streams) lead to leaked resources.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote.
>>>>>>>>>>>>>>>>>> But then I realized a few things:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources, only
>>>>>>>>>>>>>>>>>> its Iterators
>>>>>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user
>>>>>>>>>>>>>>>>>> would not have to do any extra work
>>>>>>>>>>>>>>>>>>  - you can also automatically free it up at the end of
>>>>>>>>>>>>>>>>>> the call to @ProcessElement so that is easy too (but you 
>>>>>>>>>>>>>>>>>> might not want to)
>>>>>>>>>>>>>>>>>>  - so the main use is when the iterator terminates early
>>>>>>>>>>>>>>>>>> and is not fully consumed and you can't wait to finish the 
>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of
>>>>>>>>>>>>>>>>>> uninitialized declarations outside the try-with-resources 
>>>>>>>>>>>>>>>>>> block, kind of a
>>>>>>>>>>>>>>>>>> lot of boilerplate LOC
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up 
>>>>>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two
>>>>>>>>>>>>>>>>>>> approaches, Kenn. Now I understand Luke's proposal better. 
>>>>>>>>>>>>>>>>>>> The approach
>>>>>>>>>>>>>>>>>>> looks neat, but the uncertainty of *when* GC is going
>>>>>>>>>>>>>>>>>>> to kick in will make users' life hard. If the user happens 
>>>>>>>>>>>>>>>>>>> to configure a
>>>>>>>>>>>>>>>>>>> large JVM heap size, and since rocksDb uses off-heap 
>>>>>>>>>>>>>>>>>>> memory, GC might start
>>>>>>>>>>>>>>>>>>> very late and less frequent than what we want. If we don't 
>>>>>>>>>>>>>>>>>>> have a
>>>>>>>>>>>>>>>>>>> *definitive* way to let user close the underlying
>>>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such 
>>>>>>>>>>>>>>>>>>> failures of a critical
>>>>>>>>>>>>>>>>>>> application in production.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little
>>>>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource 
>>>>>>>>>>>>>>>>>>> we are holding
>>>>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() 
>>>>>>>>>>>>>>>>>>> for a resource
>>>>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would 
>>>>>>>>>>>>>>>>>>> imagine that we
>>>>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the 
>>>>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so
>>>>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and 
>>>>>>>>>>>>>>>>>>> let me know if
>>>>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or
>>>>>>>>>>>>>>>>>>>> not. There has been no specific API proposed.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be
>>>>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems 
>>>>>>>>>>>>>>>>>>>> we have multiple
>>>>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. 
>>>>>>>>>>>>>>>>>>>> You might be
>>>>>>>>>>>>>>>>>>>> able to build something good enough with phantom 
>>>>>>>>>>>>>>>>>>>> references, but you might
>>>>>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like
>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>>>>>        public void proc(@StateId("foo")
>>>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) {
>>>>>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special
>>>>>>>>>>>>>>>>>>>> condition ...) {
>>>>>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to
>>>>>>>>>>>>>>>>>>>> close() since they iterate with for ( : ) as usual. And a 
>>>>>>>>>>>>>>>>>>>> runner that has
>>>>>>>>>>>>>>>>>>>> the 10x funding to try out a ReferenceQueue can be 
>>>>>>>>>>>>>>>>>>>> resilient to users that
>>>>>>>>>>>>>>>>>>>> forget. On the other hand, I haven't seen this pattern 
>>>>>>>>>>>>>>>>>>>> much in the wild, so
>>>>>>>>>>>>>>>>>>>> I think it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>>>>>>>>>>>>> ReferenceQueue();
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, 
>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will
>>>>>>>>>>>>>>>>>>>> pop onto the queue for being closed. This might not be too 
>>>>>>>>>>>>>>>>>>>> bad. You'll have
>>>>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors 
>>>>>>>>>>>>>>>>>>>> that are hard to
>>>>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is 
>>>>>>>>>>>>>>>>>>>> asking for
>>>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I 
>>>>>>>>>>>>>>>>>>>> have heard that
>>>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I 
>>>>>>>>>>>>>>>>>>>> haven't experienced it
>>>>>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will
>>>>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case 
>>>>>>>>>>>>>>>>>>>>> of kv-store likes
>>>>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the 
>>>>>>>>>>>>>>>>>>>>> underlying C
>>>>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to 
>>>>>>>>>>>>>>>>>>>>> release the in-memory
>>>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly 
>>>>>>>>>>>>>>>>>>>>> if it's not
>>>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as 
>>>>>>>>>>>>>>>>>>>>> you suggested
>>>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not 
>>>>>>>>>>>>>>>>>>>>> help in this case.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard 
>>>>>>>>>>>>>>>>>>>>> for best user
>>>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good 
>>>>>>>>>>>>>>>>>>>>> example of that.
>>>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I 
>>>>>>>>>>>>>>>>>>>>> am just proposing
>>>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed 
>>>>>>>>>>>>>>>>>>>>> when not needed, that
>>>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through 
>>>>>>>>>>>>>>>>>>>>> large state that
>>>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty 
>>>>>>>>>>>>>>>>>>>>> general use case
>>>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually 
>>>>>>>>>>>>>>>>>>>>> arguing this will be
>>>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more 
>>>>>>>>>>>>>>>>>>>>> users can benefit
>>>>>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using 
>>>>>>>>>>>>>>>>>>>>>> weak references and
>>>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people 
>>>>>>>>>>>>>>>>>>>>>> work 10x as hard
>>>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then 
>>>>>>>>>>>>>>>>>>>>>> having 100s or 1000s
>>>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory
>>>>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the 
>>>>>>>>>>>>>>>>>>>>>>> underlying resources.
>>>>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the 
>>>>>>>>>>>>>>>>>>>>>>> resources as you
>>>>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the
>>>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not 
>>>>>>>>>>>>>>>>>>>>>>> needed. I think while
>>>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should 
>>>>>>>>>>>>>>>>>>>>>>> have the option to let
>>>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do 
>>>>>>>>>>>>>>>>>>>>>>> you agree?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources
>>>>>>>>>>>>>>>>>>>>>>>> and forcing them to will make the user experience 
>>>>>>>>>>>>>>>>>>>>>>>> worse.
>>>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format
>>>>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to 
>>>>>>>>>>>>>>>>>>>>>>>> load/evict blocks of the
>>>>>>>>>>>>>>>>>>>>>>>> state from memory.
>>>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which
>>>>>>>>>>>>>>>>>>>>>>>> frees the resource after a certain amount of 
>>>>>>>>>>>>>>>>>>>>>>>> inactivity or uses weak
>>>>>>>>>>>>>>>>>>>>>>>> references.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and
>>>>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the 
>>>>>>>>>>>>>>>>>>>>>>>>> state returns the Java
>>>>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state 
>>>>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv
>>>>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users 
>>>>>>>>>>>>>>>>>>>>>>>>> explicitly close
>>>>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise 
>>>>>>>>>>>>>>>>>>>>>>>>> we have to load the
>>>>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the 
>>>>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb
>>>>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But
>>>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into 
>>>>>>>>>>>>>>>>>>>>>>>>> memory. I chatted with Kenn
>>>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid 
>>>>>>>>>>>>>>>>>>>>>>>>> bulk read/write. This
>>>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if 
>>>>>>>>>>>>>>>>>>>>>>>>> we can add the support
>>>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. 
>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly
>>>>>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>

Reply via email to