I don't follow why allowing freeing resources would be counter to the spec. I don't really know what you mean by consistent for a bundle. State, in the sense of the user-facing per-key-and-window state API, is single threaded and scoped to a single DoFn. There's no one else who can write the state. If a BagState is read and written and read again, the user-facing logic should be unaware of the resources and not have any logic to deal with consistency.
Kenn On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <[email protected]> wrote: > Hmm, some of the problem I'm dealing with is: > * I don't understand how you would want close to change the semantics of a > user state specification and how it affects the lifetime of user state? > ** Does it represent committing information within a bundle? > ** Does it mean that user state can ignore the replayable and consistent > semantics for a lifetime of a bundle semantics? > * I do understand that close semantics may make it easier for Samza runner > to support state that is greater then memory, but what does it provide to > users and how would they benefit (like what new scenarios would it support)? > * I don't understand in RocksDb why you need a snapshot in the first > place, and whether RocksDb can support seeking inside or outside a snapshot > relatively efficiently? > > There seems to be some context lost initially with a discussion that you > and Kenn had, is there a copy of that which could be shared? May help me > get up to speed on to the problem that is being solved. > > > On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <[email protected]> wrote: > >> I will take a look at the docs to understand the problem better. A minor >> comment to 2) is that I don't intend to change the existing iterable API. I >> plan to implement it similar to Flink, loading the data into memory and >> closing the underlying snapshot after that. So the changes should be >> backward compatible. >> >> Thanks, >> Xinyu >> >> >> >> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <[email protected]> wrote: >> >>> I believe adding support for a state spec to be 'closed' or 'freed' is >>> counter to the requirement of a state spec being consistent for the >>> lifetime of a bundle, are we willing to change this requirement for the >>> lifetime of a bundle or say that runners can arbitrary say that a StateSpec >>> can't be accessed anymore? >>> >>> If not, I'm having trouble of thinking of a good way on how to integrate >>> the 'close/free' API with portability semantics because: >>> 1) Runners do control bundle size but other then the trivial cases where >>> a runner specifically chooses: >>> a) to only process a single element at a time (negating the need for a >>> free/close() method). >>> b) optimistically process elements and if you run out of memory kill >>> the bundle and try again with a smaller bundle. >>> 2) Adding an API to provide Iterators doesn't mean that users can't >>> still use iterable which can't be force closed. (Dropping Iterable in >>> exchange for Iterator in the API is backwards incompatible so likely to be >>> deferred till the next major version of Apache Beam). >>> 3) SDKs are able to process as many elements as they want in parallel, >>> nothing requires them to execute elements serially throughout the pipeline >>> graph. >>> 4) The runner has limited information into what an SDK is doing. The SDK >>> provides a lower bound on how many elements it has processed but SDKs >>> aren't required to implement this meaning that they could technically be >>> processing everything in random order after they have seen all input for a >>> bundle. >>> >>> We'll need to work through the scenarios, Xinyu it would be useful for >>> you to take a look at these two docs for context into the problem space: >>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process a >>> bundle) >>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to >>> access side inputs, access remote references, and support user state) >>> >>> >>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> >>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Before you go on and update the user facing API, we should discuss the >>>>> last point I made since the change your making will have limited usability >>>>> since the portability effort won't realistically allow you to see such low >>>>> level things like when processElement finished and supporting user state >>>>> will be modeled using the following three operations: read (with >>>>> continuation tokens), append (blind write), clear. It may be moot to >>>>> discuss how to integrate Samza into the existing framework and should work >>>>> towards being a portability based runner. There are more details here >>>>> about >>>>> supporting user state during pipeline execution: >>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>> level docs about portability are https://s.apache.org/beam-runner-api >>>>> and https://s.apache.org/beam-fn-api. >>>>> >>>>> Do we really want to make a change to the user facing API for user >>>>> state in the Java SDK when that code path will be phased out in exchange >>>>> for using the portability APIs? >>>>> >>>> >>>> To be clear, the user API will remain the same, but the runner's >>>> implementation path will be phased out. So should we expand this discussion >>>> to how the portability APIs enable the SDK and runner to collaborate to >>>> achieve this use case? It seems like the interaction you need is that the >>>> runner can tell that the SDK can closed the connection on the read(), and >>>> the SDK needs to do so promptly, right? >>>> >>>> Kenn >>>> >>>> >>>> >>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <[email protected]> >>>>> wrote: >>>>> >>>>>> We discussed internally about the proposed approaches. Seems if the >>>>>> State API can also expose another method to return a >>>>>> ReadableState<Iterator>, >>>>>> it will cover our cases of iterating over a bigger-then-memory state, and >>>>>> closing the underlying rocksDb snapshot immediately after the iterator is >>>>>> fully consumed (most cases), and also safely close the rest of the >>>>>> iterators after proessElement (minor cases). If this sounds good to you >>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it. >>>>>> Thanks a lot for the discussions here. >>>>>> >>>>>> The discussions about Async processing is very interesting, and I >>>>>> think it's definitely worth its own thread. I believe we do need the >>>>>> support from the Beam API/model so the users can take advantage of it >>>>>> (Besides Samza, Flink also has an async operator that helps a lot in the >>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying >>>>>> framework, but it's going to be great for the users to do remote IO. In >>>>>> practice we found it actually avoids thread+locks issues, as Kenn >>>>>> mentioned >>>>>> above. I am not sure whether this feature can be runner-specific support >>>>>> thing. I will probably create another email thread for this discussion in >>>>>> the future and hopefully I can move this forward. >>>>>> >>>>>> Thanks, >>>>>> Xinyu >>>>>> >>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Users typically want to do that async operation and then produce >>>>>>>> output with it. Adding asynchronous execution is difficult within the >>>>>>>> framework because a lot of code is currently not needed to be thread >>>>>>>> safe >>>>>>>> and writing code to be fast and handle asynchronous execution is quite >>>>>>>> difficult. Adding async operations typically leads to code with too >>>>>>>> many >>>>>>>> locks/synchronization blocks. >>>>>>>> >>>>>>> >>>>>>> Just a small point here, as it is very unusual to use locks with >>>>>>> futures. The essential innovation of futures is that it is a new way to >>>>>>> program that avoids threads+locks style. In part, you get this for free >>>>>>> from functional programming style, and on the other hand you reify >>>>>>> asynchronous side effects as data dependencies. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>> >>>>>>>> Note that with the portability effort, the Runner won't have >>>>>>>> visibility into such low level things like when an object is garbage >>>>>>>> collected and supporting user state will be modeled using the following >>>>>>>> three operations: read (with continuation tokens), append (blind >>>>>>>> write), >>>>>>>> clear. It may be moot to discuss how to integrate Samza into the >>>>>>>> existing >>>>>>>> framework and should work towards being a portability based runner. >>>>>>>> There >>>>>>>> are more details here about supporting user state during pipeline >>>>>>>> execution: >>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>>>>> level docs about portability are >>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>> >>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> At least one API that has been discussed in the past, is to use >>>>>>>>> Java 8 CompletionStage. e.g. >>>>>>>>> >>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>> @ProcessElement >>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>> ...) { >>>>>>>>> element.thenApply(...) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> The framework will automatically create the CompletionStage, and >>>>>>>>> the process method can specify a pipeline of asynchronous operations >>>>>>>>> to >>>>>>>>> perform on the element. When all of them are done, the element will be >>>>>>>>> marked as successfully processed. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for all the pointers. I looked though the discussion over >>>>>>>>>> BEAM-2975 >>>>>>>>>> and BEAM-2980 about having snapshot or live views of iterable, >>>>>>>>>> and the current semantics makes a lot of sense to me. For your >>>>>>>>>> question: >>>>>>>>>> it does not require an explicit snapshot when we create RocksDb >>>>>>>>>> iterator >>>>>>>>>> directly. The iterator will read from an implicit snapshot as of the >>>>>>>>>> time >>>>>>>>>> the iterator is created [1], and the snapshot will be released after >>>>>>>>>> the >>>>>>>>>> iterator is closed. If we can have another method to return >>>>>>>>>> ReadableState<Iterator>, we might be able to apply the auto-closing >>>>>>>>>> approaches as we discussed and solve the problem here :). >>>>>>>>>> >>>>>>>>>> It's very interesting that you bring up the discussion about >>>>>>>>>> async API! Async IO has been widely adopted here among our users: >>>>>>>>>> they use >>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage >>>>>>>>>> the >>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which >>>>>>>>>> the user >>>>>>>>>> will invoke the callback after the remote calls are complete. >>>>>>>>>> Currently in >>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote >>>>>>>>>> IO. Seems >>>>>>>>>> we might have to do blocking calls (thus the poor resource >>>>>>>>>> utilization you >>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can >>>>>>>>>> send a >>>>>>>>>> few more details about the discussion about async API. I would like >>>>>>>>>> to add >>>>>>>>>> our use case and help move this forward. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xinyu >>>>>>>>>> >>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>>>>>> [2]: https://github.com/linkedin/parseq >>>>>>>>>> [3]: >>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I don't have any further suggestions, but want to call out how >>>>>>>>>>> this hits a lot of interesting points. >>>>>>>>>>> >>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] and >>>>>>>>>>> BEAM-2980 [2] where we debated things a bit. I think the strongest >>>>>>>>>>> case is >>>>>>>>>>> for what you describe - it should be a snapshot. Perhaps they >>>>>>>>>>> should both >>>>>>>>>>> be closed as fixed... >>>>>>>>>>> >>>>>>>>>>> And you also bring up long blocking calls - we have also >>>>>>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>>>>>> @ProcessElement can be embraced for simple programming and >>>>>>>>>>> compensated with >>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor >>>>>>>>>>> utilization). The alternative is a more future-istic API where the >>>>>>>>>>> calls >>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@ >>>>>>>>>>> list discussions about that, too. >>>>>>>>>>> >>>>>>>>>>> Is another possibility to perhaps have read() return a >>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two >>>>>>>>>>> methods with >>>>>>>>>>> different names, one for iterator one for snapshot iterable. But >>>>>>>>>>> wouldn't >>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb >>>>>>>>>>> iterator >>>>>>>>>>> require a snapshot to have well-defined contents? As you can tell, >>>>>>>>>>> I don't >>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>>>>>> >>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted >>>>>>>>>>>> the question here, we discussed internally about releasing the >>>>>>>>>>>> underlying >>>>>>>>>>>> resources after consuming the whole iterator. This probably covers >>>>>>>>>>>> quite a >>>>>>>>>>>> lot of use cases. For some special cases that the user only >>>>>>>>>>>> consume part of >>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>>>>>>>>> processElement() might work (I need to confirm about this with our >>>>>>>>>>>> use >>>>>>>>>>>> cases). So based on what we discussed so far, we might have a good >>>>>>>>>>>> way to >>>>>>>>>>>> automatically close an iterator for the store. >>>>>>>>>>>> >>>>>>>>>>>> There is another issue though: right now the state API returns >>>>>>>>>>>> an iterable for entries(), keys() and values(), and we can create >>>>>>>>>>>> iterator >>>>>>>>>>>> from it. From my understanding, the iterable holds a snapshot of >>>>>>>>>>>> the >>>>>>>>>>>> underlying store. In case of rocksDb, it's going to be a >>>>>>>>>>>> db.snapshot(). >>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator >>>>>>>>>>>> where we can >>>>>>>>>>>> use some heuristics to automatically release it. The user can hold >>>>>>>>>>>> on to >>>>>>>>>>>> the iterable and create iterators throughout the whole >>>>>>>>>>>> processElement(). >>>>>>>>>>>> But if we only close the iterable after processElement(), I am >>>>>>>>>>>> quite >>>>>>>>>>>> concerned about the limitations this will bring. If the user is >>>>>>>>>>>> doing some >>>>>>>>>>>> remote call during the process, then the snapshot might be held >>>>>>>>>>>> for a long >>>>>>>>>>>> time before releasing, and might cause performance problems. And >>>>>>>>>>>> if the >>>>>>>>>>>> user happen to create multiple iterables, then there will be >>>>>>>>>>>> multiple >>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive >>>>>>>>>>>> at closing >>>>>>>>>>>> the resources and recreating when needed again. But in this case >>>>>>>>>>>> it might >>>>>>>>>>>> not work since we won't be able to recreate the same snapshot >>>>>>>>>>>> given the >>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is >>>>>>>>>>>> not cheap >>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator >>>>>>>>>>>> itself >>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions? >>>>>>>>>>>> >>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in some >>>>>>>>>>>> sort of >>>>>>>>>>>> StateResource in the fear that people might reject the proposal >>>>>>>>>>>> immediately >>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are >>>>>>>>>>>> familiar >>>>>>>>>>>> with rocksDb state, it's pretty normal to close the >>>>>>>>>>>> iterator/snapshot after >>>>>>>>>>>> using it. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Xinyu >>>>>>>>>>>> >>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <[email protected] >>>>>>>>>>>> > wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that >>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom >>>>>>>>>>>>> reference >>>>>>>>>>>>> suggestion). >>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file >>>>>>>>>>>>> streams) lead to leaked resources. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Xinyu, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But >>>>>>>>>>>>>> then I realized a few things: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - usually an Iterable does not allocate resources, only its >>>>>>>>>>>>>> Iterators >>>>>>>>>>>>>> - if you consume the whole iterator, I hope the user would >>>>>>>>>>>>>> not have to do any extra work >>>>>>>>>>>>>> - you can also automatically free it up at the end of the >>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not >>>>>>>>>>>>>> want to) >>>>>>>>>>>>>> - so the main use is when the iterator terminates early and >>>>>>>>>>>>>> is not fully consumed and you can't wait to finish the method >>>>>>>>>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a lot >>>>>>>>>>>>>> of >>>>>>>>>>>>>> boilerplate LOC >>>>>>>>>>>>>> >>>>>>>>>>>>>> One thing that is good about your proposal is that the >>>>>>>>>>>>>> iterable could have some transparent caches that all free up >>>>>>>>>>>>>> together. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches, >>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach >>>>>>>>>>>>>>> looks neat, but >>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will make >>>>>>>>>>>>>>> users' life hard. If the user happens to configure a large JVM >>>>>>>>>>>>>>> heap size, >>>>>>>>>>>>>>> and since rocksDb uses off-heap memory, GC might start very >>>>>>>>>>>>>>> late and less >>>>>>>>>>>>>>> frequent than what we want. If we don't have a *definitive* >>>>>>>>>>>>>>> way to let user close the underlying resources, then there is >>>>>>>>>>>>>>> no good way >>>>>>>>>>>>>>> to handle such failures of a critical application in production. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Having a close method in the iterator might be a little >>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we >>>>>>>>>>>>>>> are holding >>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() for >>>>>>>>>>>>>>> a resource >>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would >>>>>>>>>>>>>>> imagine that we >>>>>>>>>>>>>>> also define a resource for the state iterator and make the >>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>>>>>> .. do stuff ... >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>>>> ... user code >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The type/method name are just for illustrating here, so >>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and let >>>>>>>>>>>>>>> me know if >>>>>>>>>>>>>>> you have thoughts about the programming patterns here. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not. >>>>>>>>>>>>>>>> There has been no specific API proposed. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think the problem statement is real - you need to be able >>>>>>>>>>>>>>>> to read and write bigger-than-memory state. It seems we have >>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. You >>>>>>>>>>>>>>>> might be >>>>>>>>>>>>>>>> able to build something good enough with phantom references, >>>>>>>>>>>>>>>> but you might >>>>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If I understand the idea, it might look something like this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>>>>>> private final StateSpec<BagState<Whatever>> >>>>>>>>>>>>>>>> myBagSpec = ... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>> public void proc(@StateId("foo") BagState<Whatever> >>>>>>>>>>>>>>>> myBag, ...) { >>>>>>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>>>>>> while(iterator.hasNext() && ... special condition >>>>>>>>>>>>>>>> ...) { >>>>>>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> iterator.close(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close() >>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner that >>>>>>>>>>>>>>>> has the 10x >>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to users >>>>>>>>>>>>>>>> that forget. >>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the >>>>>>>>>>>>>>>> wild, so I think >>>>>>>>>>>>>>>> it is valuable to discuss other methods. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> While Luke's proposal is something like this if I >>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with >>>>>>>>>>>>>>>> PhantomReference seems >>>>>>>>>>>>>>>> to be what you really want): >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>>>>>>>>> ReferenceQueue(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class Iterator { >>>>>>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>>>>>> .next() { >>>>>>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class Iterable { >>>>>>>>>>>>>>>> .iterator() { >>>>>>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue)); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>>>>>> while(true) { >>>>>>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop >>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. >>>>>>>>>>>>>>>> You'll have >>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that >>>>>>>>>>>>>>>> are hard to >>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is >>>>>>>>>>>>>>>> asking for >>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I >>>>>>>>>>>>>>>> have heard that >>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't >>>>>>>>>>>>>>>> experienced it >>>>>>>>>>>>>>>> myself and I can't find good data. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If I understand correctly, using weak references will help >>>>>>>>>>>>>>>>> clean up the Java objects once GC kicks in. In case of >>>>>>>>>>>>>>>>> kv-store likes >>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the >>>>>>>>>>>>>>>>> underlying C >>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release >>>>>>>>>>>>>>>>> the in-memory >>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly if >>>>>>>>>>>>>>>>> it's not >>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as you >>>>>>>>>>>>>>>>> suggested >>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help in >>>>>>>>>>>>>>>>> this case. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understand your concern, and I think you might >>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard for >>>>>>>>>>>>>>>>> best user >>>>>>>>>>>>>>>>> experience, and I think the current API provides a good >>>>>>>>>>>>>>>>> example of that. >>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I am >>>>>>>>>>>>>>>>> just proposing >>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when >>>>>>>>>>>>>>>>> not needed, that >>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large >>>>>>>>>>>>>>>>> state that >>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty >>>>>>>>>>>>>>>>> general use case >>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually arguing >>>>>>>>>>>>>>>>> this will be >>>>>>>>>>>>>>>>> a better user experience to add this extra API since more >>>>>>>>>>>>>>>>> users can benefit >>>>>>>>>>>>>>>>> from it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using weak >>>>>>>>>>>>>>>>>> references and >>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people >>>>>>>>>>>>>>>>>> work 10x as hard >>>>>>>>>>>>>>>>>> to provide a good implementation is much better then having >>>>>>>>>>>>>>>>>> 100s or 1000s >>>>>>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory >>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the >>>>>>>>>>>>>>>>>>> underlying resources. >>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the >>>>>>>>>>>>>>>>>>> resources as you >>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the >>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not >>>>>>>>>>>>>>>>>>> needed. I think while >>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have >>>>>>>>>>>>>>>>>>> the option to let >>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do you >>>>>>>>>>>>>>>>>>> agree? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and >>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse. >>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format >>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict >>>>>>>>>>>>>>>>>>>> blocks of the >>>>>>>>>>>>>>>>>>>> state from memory. >>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees >>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or uses >>>>>>>>>>>>>>>>>>>> weak references. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and >>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state >>>>>>>>>>>>>>>>>>>>> returns the Java >>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state backed >>>>>>>>>>>>>>>>>>>>> by file-based kv >>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users >>>>>>>>>>>>>>>>>>>>> explicitly close >>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we >>>>>>>>>>>>>>>>>>>>> have to load the >>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the >>>>>>>>>>>>>>>>>>>>> underlying rocksDb >>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But this >>>>>>>>>>>>>>>>>>>>> won't work for states that don't fit into memory. I >>>>>>>>>>>>>>>>>>>>> chatted with Kenn and >>>>>>>>>>>>>>>>>>>>> he also agrees we need this capability to avoid bulk >>>>>>>>>>>>>>>>>>>>> read/write. This >>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we >>>>>>>>>>>>>>>>>>>>> can add the support >>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any >>>>>>>>>>>>>>>>>>>>> feedback is highly >>>>>>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> >>
