I misspoke when I said portability semantics and should have said portability design/implementation. This is why I had a follow-up e-mail and clarified that I'm confused on: * I don't understand how you would want close to change the semantics of a user state specification and how it affects the lifetime of user state? ** Does it represent committing information within a bundle? ** Does it mean that user state can ignore the replayable and consistent semantics for a lifetime of a bundle semantics?
I'm trying to tie back what does a `ReadableState<Iterator> readIterator()` means for Runner authors and how it solves the memory/close() problem for Samza. Based upon https://s.apache.org/beam-state, reading and writing of state must be consistent. Does this mean that Samza must use snapshots for the lifetime of a bundle? If so, I don't see how adding a `ReadableState<Iterator> readIterator()` allows Samza to ignore the consistency requirement and be allowed to free snapshots. It might be worthwhile to setup a three way hangouts call to help me as I don't have the same level of context which can be shared back to this thread. Xinyu / Kenn, how about we setup a time using Slack? On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <[email protected]> wrote: > I feel like this discussion is kind of far from the primary intention. The > point of ParDo(stateful DoFn) is to enable naive single-threaded code in a > style intuitive to a beginning imperative programmer. So: > > - the return value of read() should act like an immutable value > - if there is a read after a write, that read should reflect the changes > written, because there's a trivial happens-before relationship induced by > program order > - there are no non-trivial happens-before relationships > - a write after a read should not affect the value read before > > From that starting point, the limitations on expressiveness (single > threaded per-key-and-window-and-step) give rise to embarrassingly parallel > computation and GC. > > So when you say "portability semantics" it makes me concerned. The word > "semantics" refers to the mapping between what a user writes and what it > means. Primarily, that means the conceptual translation between a primitive > PTransform and the corresponding PCollection-to-PCollection operation. It > is true that portability complicates things because a user's code can only > be given meaning relative to an SDK harness. But the end-user intention is > unchanged. If we had time and resources to give the Fn API a solid spec, it > should be such that the SDK harness has little choice but to implement the > primitives as intended. > > In other words, it is the job of > https://s.apache.org/beam-fn-state-api-and-bundle-processing* to > implement https://s.apache.org/beam-state. The discussion of adding > `ReadableState<Iterator> readIterator()` to BagState seems consistent with > the latter. > > Kenn > > *IIUC you've chosen to use the same underlying proto service for side > inputs and by-reference values. That's an implementation detail that I have > no particular opinion about except that if it complicates implementing the > primary motivator for state, it may not be a great fit. > > > > On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <[email protected]> wrote: > >> I don't follow why allowing freeing resources would be counter to the >> spec. I don't really know what you mean by consistent for a bundle. State, >> in the sense of the user-facing per-key-and-window state API, is single >> threaded and scoped to a single DoFn. There's no one else who can write the >> state. If a BagState is read and written and read again, the user-facing >> logic should be unaware of the resources and not have any logic to deal >> with consistency. >> >> Kenn >> >> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <[email protected]> wrote: >> >>> Hmm, some of the problem I'm dealing with is: >>> * I don't understand how you would want close to change the semantics of >>> a user state specification and how it affects the lifetime of user state? >>> ** Does it represent committing information within a bundle? >>> ** Does it mean that user state can ignore the replayable and consistent >>> semantics for a lifetime of a bundle semantics? >>> * I do understand that close semantics may make it easier for Samza >>> runner to support state that is greater then memory, but what does it >>> provide to users and how would they benefit (like what new scenarios would >>> it support)? >>> * I don't understand in RocksDb why you need a snapshot in the first >>> place, and whether RocksDb can support seeking inside or outside a snapshot >>> relatively efficiently? >>> >>> There seems to be some context lost initially with a discussion that you >>> and Kenn had, is there a copy of that which could be shared? May help me >>> get up to speed on to the problem that is being solved. >>> >>> >>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <[email protected]> wrote: >>> >>>> I will take a look at the docs to understand the problem better. A >>>> minor comment to 2) is that I don't intend to change the existing iterable >>>> API. I plan to implement it similar to Flink, loading the data into memory >>>> and closing the underlying snapshot after that. So the changes should be >>>> backward compatible. >>>> >>>> Thanks, >>>> Xinyu >>>> >>>> >>>> >>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <[email protected]> wrote: >>>> >>>>> I believe adding support for a state spec to be 'closed' or 'freed' is >>>>> counter to the requirement of a state spec being consistent for the >>>>> lifetime of a bundle, are we willing to change this requirement for the >>>>> lifetime of a bundle or say that runners can arbitrary say that a >>>>> StateSpec >>>>> can't be accessed anymore? >>>>> >>>>> If not, I'm having trouble of thinking of a good way on how to >>>>> integrate the 'close/free' API with portability semantics because: >>>>> 1) Runners do control bundle size but other then the trivial cases >>>>> where a runner specifically chooses: >>>>> a) to only process a single element at a time (negating the need for >>>>> a free/close() method). >>>>> b) optimistically process elements and if you run out of memory kill >>>>> the bundle and try again with a smaller bundle. >>>>> 2) Adding an API to provide Iterators doesn't mean that users can't >>>>> still use iterable which can't be force closed. (Dropping Iterable in >>>>> exchange for Iterator in the API is backwards incompatible so likely to be >>>>> deferred till the next major version of Apache Beam). >>>>> 3) SDKs are able to process as many elements as they want in parallel, >>>>> nothing requires them to execute elements serially throughout the pipeline >>>>> graph. >>>>> 4) The runner has limited information into what an SDK is doing. The >>>>> SDK provides a lower bound on how many elements it has processed but SDKs >>>>> aren't required to implement this meaning that they could technically be >>>>> processing everything in random order after they have seen all input for a >>>>> bundle. >>>>> >>>>> We'll need to work through the scenarios, Xinyu it would be useful for >>>>> you to take a look at these two docs for context into the problem space: >>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process >>>>> a bundle) >>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to >>>>> access side inputs, access remote references, and support user state) >>>>> >>>>> >>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Before you go on and update the user facing API, we should discuss >>>>>>> the last point I made since the change your making will have limited >>>>>>> usability since the portability effort won't realistically allow you to >>>>>>> see >>>>>>> such low level things like when processElement finished and supporting >>>>>>> user >>>>>>> state will be modeled using the following three operations: read (with >>>>>>> continuation tokens), append (blind write), clear. It may be moot to >>>>>>> discuss how to integrate Samza into the existing framework and should >>>>>>> work >>>>>>> towards being a portability based runner. There are more details here >>>>>>> about >>>>>>> supporting user state during pipeline execution: >>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>>>> level docs about portability are >>>>>>> https://s.apache.org/beam-runner-api and >>>>>>> https://s.apache.org/beam-fn-api. >>>>>>> >>>>>>> Do we really want to make a change to the user facing API for user >>>>>>> state in the Java SDK when that code path will be phased out in exchange >>>>>>> for using the portability APIs? >>>>>>> >>>>>> >>>>>> To be clear, the user API will remain the same, but the runner's >>>>>> implementation path will be phased out. So should we expand this >>>>>> discussion >>>>>> to how the portability APIs enable the SDK and runner to collaborate to >>>>>> achieve this use case? It seems like the interaction you need is that the >>>>>> runner can tell that the SDK can closed the connection on the read(), and >>>>>> the SDK needs to do so promptly, right? >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>> >>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> We discussed internally about the proposed approaches. Seems if the >>>>>>>> State API can also expose another method to return a >>>>>>>> ReadableState<Iterator>, >>>>>>>> it will cover our cases of iterating over a bigger-then-memory state, >>>>>>>> and >>>>>>>> closing the underlying rocksDb snapshot immediately after the iterator >>>>>>>> is >>>>>>>> fully consumed (most cases), and also safely close the rest of the >>>>>>>> iterators after proessElement (minor cases). If this sounds good to you >>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it. >>>>>>>> Thanks a lot for the discussions here. >>>>>>>> >>>>>>>> The discussions about Async processing is very interesting, and I >>>>>>>> think it's definitely worth its own thread. I believe we do need the >>>>>>>> support from the Beam API/model so the users can take advantage of it >>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in >>>>>>>> the >>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying >>>>>>>> framework, but it's going to be great for the users to do remote IO. In >>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn >>>>>>>> mentioned >>>>>>>> above. I am not sure whether this feature can be runner-specific >>>>>>>> support >>>>>>>> thing. I will probably create another email thread for this discussion >>>>>>>> in >>>>>>>> the future and hopefully I can move this forward. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Users typically want to do that async operation and then produce >>>>>>>>>> output with it. Adding asynchronous execution is difficult within the >>>>>>>>>> framework because a lot of code is currently not needed to be thread >>>>>>>>>> safe >>>>>>>>>> and writing code to be fast and handle asynchronous execution is >>>>>>>>>> quite >>>>>>>>>> difficult. Adding async operations typically leads to code with too >>>>>>>>>> many >>>>>>>>>> locks/synchronization blocks. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Just a small point here, as it is very unusual to use locks with >>>>>>>>> futures. The essential innovation of futures is that it is a new way >>>>>>>>> to >>>>>>>>> program that avoids threads+locks style. In part, you get this for >>>>>>>>> free >>>>>>>>> from functional programming style, and on the other hand you reify >>>>>>>>> asynchronous side effects as data dependencies. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> Note that with the portability effort, the Runner won't have >>>>>>>>>> visibility into such low level things like when an object is garbage >>>>>>>>>> collected and supporting user state will be modeled using the >>>>>>>>>> following >>>>>>>>>> three operations: read (with continuation tokens), append (blind >>>>>>>>>> write), >>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the >>>>>>>>>> existing >>>>>>>>>> framework and should work towards being a portability based runner. >>>>>>>>>> There >>>>>>>>>> are more details here about supporting user state during pipeline >>>>>>>>>> execution: >>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, >>>>>>>>>> top level docs about portability are >>>>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>>>> >>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> At least one API that has been discussed in the past, is to use >>>>>>>>>>> Java 8 CompletionStage. e.g. >>>>>>>>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>> element, ...) { >>>>>>>>>>> element.thenApply(...) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> The framework will automatically create the CompletionStage, and >>>>>>>>>>> the process method can specify a pipeline of asynchronous >>>>>>>>>>> operations to >>>>>>>>>>> perform on the element. When all of them are done, the element will >>>>>>>>>>> be >>>>>>>>>>> marked as successfully processed. >>>>>>>>>>> >>>>>>>>>>> Reuven >>>>>>>>>>> >>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion >>>>>>>>>>>> over BEAM-2975 and BEAM-2980 about having snapshot or live >>>>>>>>>>>> views of iterable, and the current semantics makes a lot of sense >>>>>>>>>>>> to me. >>>>>>>>>>>> For your question: it does not require an explicit snapshot when >>>>>>>>>>>> we create >>>>>>>>>>>> RocksDb iterator directly. The iterator will read from an implicit >>>>>>>>>>>> snapshot >>>>>>>>>>>> as of the time the iterator is created [1], and the snapshot will >>>>>>>>>>>> be >>>>>>>>>>>> released after the iterator is closed. If we can have another >>>>>>>>>>>> method to >>>>>>>>>>>> return ReadableState<Iterator>, we might be able to apply the >>>>>>>>>>>> auto-closing >>>>>>>>>>>> approaches as we discussed and solve the problem here :). >>>>>>>>>>>> >>>>>>>>>>>> It's very interesting that you bring up the discussion about >>>>>>>>>>>> async API! Async IO has been widely adopted here among our users: >>>>>>>>>>>> they use >>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage >>>>>>>>>>>> the >>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which >>>>>>>>>>>> the user >>>>>>>>>>>> will invoke the callback after the remote calls are complete. >>>>>>>>>>>> Currently in >>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote >>>>>>>>>>>> IO. Seems >>>>>>>>>>>> we might have to do blocking calls (thus the poor resource >>>>>>>>>>>> utilization you >>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can >>>>>>>>>>>> send a >>>>>>>>>>>> few more details about the discussion about async API. I would >>>>>>>>>>>> like to add >>>>>>>>>>>> our use case and help move this forward. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Xinyu >>>>>>>>>>>> >>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>>>>>>>> [2]: https://github.com/linkedin/parseq >>>>>>>>>>>> [3]: >>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I don't have any further suggestions, but want to call out how >>>>>>>>>>>>> this hits a lot of interesting points. >>>>>>>>>>>>> >>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] >>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the >>>>>>>>>>>>> strongest case >>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they >>>>>>>>>>>>> should >>>>>>>>>>>>> both be closed as fixed... >>>>>>>>>>>>> >>>>>>>>>>>>> And you also bring up long blocking calls - we have also >>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and >>>>>>>>>>>>> compensated with >>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor >>>>>>>>>>>>> utilization). The alternative is a more future-istic API where >>>>>>>>>>>>> the calls >>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@ >>>>>>>>>>>>> list discussions about that, too. >>>>>>>>>>>>> >>>>>>>>>>>>> Is another possibility to perhaps have read() return a >>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two >>>>>>>>>>>>> methods with >>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But >>>>>>>>>>>>> wouldn't >>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb >>>>>>>>>>>>> iterator >>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can >>>>>>>>>>>>> tell, I don't >>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions. >>>>>>>>>>>>> >>>>>>>>>>>>> Kenn >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted >>>>>>>>>>>>>> the question here, we discussed internally about releasing the >>>>>>>>>>>>>> underlying >>>>>>>>>>>>>> resources after consuming the whole iterator. This probably >>>>>>>>>>>>>> covers quite a >>>>>>>>>>>>>> lot of use cases. For some special cases that the user only >>>>>>>>>>>>>> consume part of >>>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>>>>>>>>>>> processElement() might work (I need to confirm about this with >>>>>>>>>>>>>> our use >>>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a >>>>>>>>>>>>>> good way to >>>>>>>>>>>>>> automatically close an iterator for the store. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There is another issue though: right now the state API >>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and we >>>>>>>>>>>>>> can create >>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds a >>>>>>>>>>>>>> snapshot of >>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be a >>>>>>>>>>>>>> db.snapshot(). >>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator >>>>>>>>>>>>>> where we can >>>>>>>>>>>>>> use some heuristics to automatically release it. The user can >>>>>>>>>>>>>> hold on to >>>>>>>>>>>>>> the iterable and create iterators throughout the whole >>>>>>>>>>>>>> processElement(). >>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am >>>>>>>>>>>>>> quite >>>>>>>>>>>>>> concerned about the limitations this will bring. If the user is >>>>>>>>>>>>>> doing some >>>>>>>>>>>>>> remote call during the process, then the snapshot might be held >>>>>>>>>>>>>> for a long >>>>>>>>>>>>>> time before releasing, and might cause performance problems. And >>>>>>>>>>>>>> if the >>>>>>>>>>>>>> user happen to create multiple iterables, then there will be >>>>>>>>>>>>>> multiple >>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive >>>>>>>>>>>>>> at closing >>>>>>>>>>>>>> the resources and recreating when needed again. But in this case >>>>>>>>>>>>>> it might >>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot >>>>>>>>>>>>>> given the >>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is >>>>>>>>>>>>>> not cheap >>>>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator >>>>>>>>>>>>>> itself >>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions? >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in >>>>>>>>>>>>>> some sort of >>>>>>>>>>>>>> StateResource in the fear that people might reject the proposal >>>>>>>>>>>>>> immediately >>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are >>>>>>>>>>>>>> familiar >>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the >>>>>>>>>>>>>> iterator/snapshot after >>>>>>>>>>>>>> using it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that >>>>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom >>>>>>>>>>>>>>> reference >>>>>>>>>>>>>>> suggestion). >>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file >>>>>>>>>>>>>>> streams) lead to leaked resources. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks Xinyu, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But >>>>>>>>>>>>>>>> then I realized a few things: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - usually an Iterable does not allocate resources, only >>>>>>>>>>>>>>>> its Iterators >>>>>>>>>>>>>>>> - if you consume the whole iterator, I hope the user would >>>>>>>>>>>>>>>> not have to do any extra work >>>>>>>>>>>>>>>> - you can also automatically free it up at the end of the >>>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not >>>>>>>>>>>>>>>> want to) >>>>>>>>>>>>>>>> - so the main use is when the iterator terminates early >>>>>>>>>>>>>>>> and is not fully consumed and you can't wait to finish the >>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a >>>>>>>>>>>>>>>> lot of >>>>>>>>>>>>>>>> boilerplate LOC >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One thing that is good about your proposal is that the >>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up >>>>>>>>>>>>>>>> together. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches, >>>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach >>>>>>>>>>>>>>>>> looks neat, but >>>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will >>>>>>>>>>>>>>>>> make users' life hard. If the user happens to configure a >>>>>>>>>>>>>>>>> large JVM heap >>>>>>>>>>>>>>>>> size, and since rocksDb uses off-heap memory, GC might start >>>>>>>>>>>>>>>>> very late and >>>>>>>>>>>>>>>>> less frequent than what we want. If we don't have a >>>>>>>>>>>>>>>>> *definitive* way to let user close the underlying >>>>>>>>>>>>>>>>> resources, then there is no good way to handle such failures >>>>>>>>>>>>>>>>> of a critical >>>>>>>>>>>>>>>>> application in production. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little >>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we >>>>>>>>>>>>>>>>> are holding >>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() >>>>>>>>>>>>>>>>> for a resource >>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would >>>>>>>>>>>>>>>>> imagine that we >>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the >>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>>>>>>>> .. do stuff ... >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>>>>>> ... user code >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so >>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and >>>>>>>>>>>>>>>>> let me know if >>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not. >>>>>>>>>>>>>>>>>> There has been no specific API proposed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be >>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems we >>>>>>>>>>>>>>>>>> have multiple >>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. >>>>>>>>>>>>>>>>>> You might be >>>>>>>>>>>>>>>>>> able to build something good enough with phantom references, >>>>>>>>>>>>>>>>>> but you might >>>>>>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like >>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>>>>>>>> private final StateSpec<BagState<Whatever>> >>>>>>>>>>>>>>>>>> myBagSpec = ... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>> public void proc(@StateId("foo") >>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) { >>>>>>>>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>>>>>>>> while(iterator.hasNext() && ... special >>>>>>>>>>>>>>>>>> condition ...) { >>>>>>>>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> iterator.close(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close() >>>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner >>>>>>>>>>>>>>>>>> that has the 10x >>>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to >>>>>>>>>>>>>>>>>> users that forget. >>>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the >>>>>>>>>>>>>>>>>> wild, so I think >>>>>>>>>>>>>>>>>> it is valuable to discuss other methods. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I >>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with >>>>>>>>>>>>>>>>>> PhantomReference seems >>>>>>>>>>>>>>>>>> to be what you really want): >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>>>>>>>>>>> ReferenceQueue(); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> class Iterator { >>>>>>>>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>>>>>>>> .next() { >>>>>>>>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> class Iterable { >>>>>>>>>>>>>>>>>> .iterator() { >>>>>>>>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, >>>>>>>>>>>>>>>>>> rocksDbIteratorQueue)); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>>>>>>>> while(true) { >>>>>>>>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop >>>>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. >>>>>>>>>>>>>>>>>> You'll have >>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that >>>>>>>>>>>>>>>>>> are hard to >>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is >>>>>>>>>>>>>>>>>> asking for >>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I >>>>>>>>>>>>>>>>>> have heard that >>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't >>>>>>>>>>>>>>>>>> experienced it >>>>>>>>>>>>>>>>>> myself and I can't find good data. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will >>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case of >>>>>>>>>>>>>>>>>>> kv-store likes >>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the >>>>>>>>>>>>>>>>>>> underlying C >>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release >>>>>>>>>>>>>>>>>>> the in-memory >>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly >>>>>>>>>>>>>>>>>>> if it's not >>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as >>>>>>>>>>>>>>>>>>> you suggested >>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help >>>>>>>>>>>>>>>>>>> in this case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might >>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard >>>>>>>>>>>>>>>>>>> for best user >>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good >>>>>>>>>>>>>>>>>>> example of that. >>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I >>>>>>>>>>>>>>>>>>> am just proposing >>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when >>>>>>>>>>>>>>>>>>> not needed, that >>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large >>>>>>>>>>>>>>>>>>> state that >>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty >>>>>>>>>>>>>>>>>>> general use case >>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually >>>>>>>>>>>>>>>>>>> arguing this will be >>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more >>>>>>>>>>>>>>>>>>> users can benefit >>>>>>>>>>>>>>>>>>> from it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using >>>>>>>>>>>>>>>>>>>> weak references and >>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people >>>>>>>>>>>>>>>>>>>> work 10x as hard >>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then >>>>>>>>>>>>>>>>>>>> having 100s or 1000s >>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory >>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the >>>>>>>>>>>>>>>>>>>>> underlying resources. >>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the >>>>>>>>>>>>>>>>>>>>> resources as you >>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the >>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not >>>>>>>>>>>>>>>>>>>>> needed. I think while >>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have >>>>>>>>>>>>>>>>>>>>> the option to let >>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do >>>>>>>>>>>>>>>>>>>>> you agree? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and >>>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse. >>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format >>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict >>>>>>>>>>>>>>>>>>>>>> blocks of the >>>>>>>>>>>>>>>>>>>>>> state from memory. >>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees >>>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or >>>>>>>>>>>>>>>>>>>>>> uses weak references. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and >>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state >>>>>>>>>>>>>>>>>>>>>>> returns the Java >>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state >>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv >>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users >>>>>>>>>>>>>>>>>>>>>>> explicitly close >>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we >>>>>>>>>>>>>>>>>>>>>>> have to load the >>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the >>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb >>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But >>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. >>>>>>>>>>>>>>>>>>>>>>> I chatted with Kenn >>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid >>>>>>>>>>>>>>>>>>>>>>> bulk read/write. This >>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we >>>>>>>>>>>>>>>>>>>>>>> can add the support >>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any >>>>>>>>>>>>>>>>>>>>>>> feedback is highly >>>>>>>>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>
