On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]> wrote:
> Users typically want to do that async operation and then produce output > with it. Adding asynchronous execution is difficult within the framework > because a lot of code is currently not needed to be thread safe and writing > code to be fast and handle asynchronous execution is quite difficult. > Adding async operations typically leads to code with too many > locks/synchronization blocks. > Just a small point here, as it is very unusual to use locks with futures. The essential innovation of futures is that it is a new way to program that avoids threads+locks style. In part, you get this for free from functional programming style, and on the other hand you reify asynchronous side effects as data dependencies. Kenn > Note that with the portability effort, the Runner won't have visibility > into such low level things like when an object is garbage collected and > supporting user state will be modeled using the following three operations: > read (with continuation tokens), append (blind write), clear. It may be > moot to discuss how to integrate Samza into the existing framework and > should work towards being a portability based runner. There are more > details here about supporting user state during pipeline execution: > https://s.apache.org/beam-fn-state-api-and-bundle-processing, top level > docs about portability are https://s.apache.org/beam-runner-api and > https://s.apache.org/beam-fn-api. > > On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]> wrote: > >> At least one API that has been discussed in the past, is to use Java 8 >> CompletionStage. e.g. >> >> new DoFn<InputT, OutputT>() { >> @ProcessElement >> public void process(@Element CompletionStage<InputT> element, ...) { >> element.thenApply(...) >> } >> } >> >> The framework will automatically create the CompletionStage, and the >> process method can specify a pipeline of asynchronous operations to perform >> on the element. When all of them are done, the element will be marked as >> successfully processed. >> >> Reuven >> >> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <[email protected]> wrote: >> >>> Thanks for all the pointers. I looked though the discussion over BEAM-2975 >>> and BEAM-2980 about having snapshot or live views of iterable, and the >>> current semantics makes a lot of sense to me. For your question: it does >>> not require an explicit snapshot when we create RocksDb iterator directly. >>> The iterator will read from an implicit snapshot as of the time the >>> iterator is created [1], and the snapshot will be released after the >>> iterator is closed. If we can have another method to return >>> ReadableState<Iterator>, we might be able to apply the auto-closing >>> approaches as we discussed and solve the problem here :). >>> >>> It's very interesting that you bring up the discussion about async API! >>> Async IO has been widely adopted here among our users: they use netty for >>> async calls with library named ParSeq [2] to help manage the calls. Samza >>> provides a primitive callback style API [3], in which the user will invoke >>> the callback after the remote calls are complete. Currently in a Samza job >>> our users use this API with the ParSeq lib for remote IO. Seems we might >>> have to do blocking calls (thus the poor resource utilization you >>> mentioned) when using Beam API for now. It'll be great if you can send a >>> few more details about the discussion about async API. I would like to add >>> our use case and help move this forward. >>> >>> Thanks, >>> Xinyu >>> >>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>> [2]: https://github.com/linkedin/parseq >>> [3]: >>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>> >>> >>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <[email protected]> wrote: >>> >>>> I don't have any further suggestions, but want to call out how this >>>> hits a lot of interesting points. >>>> >>>> The point about snapshotting is great. We have BEAM-2975 [1] and >>>> BEAM-2980 [2] where we debated things a bit. I think the strongest case is >>>> for what you describe - it should be a snapshot. Perhaps they should both >>>> be closed as fixed... >>>> >>>> And you also bring up long blocking calls - we have also deliberately >>>> decided that long synchronous blocking calls in @ProcessElement can be >>>> embraced for simple programming and compensated with autoscaling smarts >>>> (e.g. expand the thread pool by noticing poor utilization). The alternative >>>> is a more future-istic API where the calls can be explicitly asynchronous. >>>> We've had some interesting dev@ list discussions about that, too. >>>> >>>> Is another possibility to perhaps have read() return a >>>> ReadableState<Iterator> instead? We could, of course, have two methods with >>>> different names, one for iterator one for snapshot iterable. But wouldn't >>>> the Iterator also require a snapshot? Doesn't a native RocksDb iterator >>>> require a snapshot to have well-defined contents? As you can tell, I don't >>>> know enough about RocksDb details to be sure of my suggestions. >>>> >>>> Kenn >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>> >>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <[email protected]> >>>> wrote: >>>> >>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the >>>>> question here, we discussed internally about releasing the underlying >>>>> resources after consuming the whole iterator. This probably covers quite a >>>>> lot of use cases. For some special cases that the user only consume part >>>>> of >>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>> processElement() might work (I need to confirm about this with our use >>>>> cases). So based on what we discussed so far, we might have a good way to >>>>> automatically close an iterator for the store. >>>>> >>>>> There is another issue though: right now the state API returns an >>>>> iterable for entries(), keys() and values(), and we can create iterator >>>>> from it. From my understanding, the iterable holds a snapshot of the >>>>> underlying store. In case of rocksDb, it's going to be a db.snapshot(). >>>>> Then when can we release the snapshot? It's not like iterator where we can >>>>> use some heuristics to automatically release it. The user can hold on to >>>>> the iterable and create iterators throughout the whole processElement(). >>>>> But if we only close the iterable after processElement(), I am quite >>>>> concerned about the limitations this will bring. If the user is doing some >>>>> remote call during the process, then the snapshot might be held for a long >>>>> time before releasing, and might cause performance problems. And if the >>>>> user happen to create multiple iterables, then there will be multiple >>>>> snapshots loaded during process. Luke suggested being aggressive at >>>>> closing >>>>> the resources and recreating when needed again. But in this case it might >>>>> not work since we won't be able to recreate the same snapshot given the >>>>> store might have been updated (and creating rocksDb snapshot is not cheap >>>>> too). I am running out of ideas other than exposing the iterator itself >>>>> somehow (and add close() if needed?). Any further suggestions? >>>>> >>>>> @Kenn: btw, I have the same impl you posted earlier >>>>> (CloseableIterator) in an internal interface. I wrapped it in some sort of >>>>> StateResource in the fear that people might reject the proposal >>>>> immediately >>>>> after seeing the close() on the iterator. I guess our users are familiar >>>>> with rocksDb state, it's pretty normal to close the iterator/snapshot >>>>> after >>>>> using it. >>>>> >>>>> Thanks, >>>>> Xinyu >>>>> >>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <[email protected]> >>>>> wrote: >>>>> >>>>>> The iterator going out of scope is the idiomatic way that resources >>>>>> are freed for Java developers (hence the weak/phantom reference >>>>>> suggestion). >>>>>> Explicitly requiring users to deal with 'handles' (like file streams) >>>>>> lead to leaked resources. >>>>>> >>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks Xinyu, >>>>>>> >>>>>>> I actually had first sketched out just what you wrote. But then I >>>>>>> realized a few things: >>>>>>> >>>>>>> - usually an Iterable does not allocate resources, only its >>>>>>> Iterators >>>>>>> - if you consume the whole iterator, I hope the user would not have >>>>>>> to do any extra work >>>>>>> - you can also automatically free it up at the end of the call to >>>>>>> @ProcessElement so that is easy too (but you might not want to) >>>>>>> - so the main use is when the iterator terminates early and is not >>>>>>> fully consumed and you can't wait to finish the method >>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>> declarations outside the try-with-resources block, kind of a lot of >>>>>>> boilerplate LOC >>>>>>> >>>>>>> One thing that is good about your proposal is that the iterable >>>>>>> could have some transparent caches that all free up together. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for drafting the details about the two approaches, Kenn. Now >>>>>>>> I understand Luke's proposal better. The approach looks neat, but the >>>>>>>> uncertainty of *when* GC is going to kick in will make users' life >>>>>>>> hard. If the user happens to configure a large JVM heap size, and since >>>>>>>> rocksDb uses off-heap memory, GC might start very late and less >>>>>>>> frequent >>>>>>>> than what we want. If we don't have a *definitive* way to let user >>>>>>>> close the underlying resources, then there is no good way to handle >>>>>>>> such >>>>>>>> failures of a critical application in production. >>>>>>>> >>>>>>>> Having a close method in the iterator might be a little unorthodox >>>>>>>> :). To some degree, this is actually a resource we are holding >>>>>>>> underneath, >>>>>>>> and I think it's pretty common to have close() for a resource in Java, >>>>>>>> e.g. >>>>>>>> BufferedReader and BufferedWriter. So I would imagine that we also >>>>>>>> define a >>>>>>>> resource for the state iterator and make the interface implements >>>>>>>> *AutoCloseable*. Here is my sketch: >>>>>>>> >>>>>>>> // StateResource MUST be closed after use. >>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>> bagState.iteratorResource()) { >>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>> while (iter.hasNext() { >>>>>>>> .. do stuff ... >>>>>>>> } >>>>>>>> } catch (Exception e) { >>>>>>>> ... user code >>>>>>>> } >>>>>>>> >>>>>>>> The type/method name are just for illustrating here, so please >>>>>>>> don't laugh at them. Please feel free to comment and let me know if you >>>>>>>> have thoughts about the programming patterns here. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> It is too soon to argue whether an API is complex or not. There >>>>>>>>> has been no specific API proposed. >>>>>>>>> >>>>>>>>> I think the problem statement is real - you need to be able to >>>>>>>>> read and write bigger-than-memory state. It seems we have multiple >>>>>>>>> runners >>>>>>>>> that don't support it, perhaps because of our API. You might be able >>>>>>>>> to >>>>>>>>> build something good enough with phantom references, but you might >>>>>>>>> not. >>>>>>>>> >>>>>>>>> If I understand the idea, it might look something like this: >>>>>>>>> >>>>>>>>> new DoFn<>() { >>>>>>>>> @StateId("foo") >>>>>>>>> private final StateSpec<BagState<Whatever>> myBagSpec = ... >>>>>>>>> >>>>>>>>> @ProcessElement >>>>>>>>> public void proc(@StateId("foo") BagState<Whatever> myBag, >>>>>>>>> ...) { >>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>> myBag.get().iterator(); >>>>>>>>> while(iterator.hasNext() && ... special condition ...) { >>>>>>>>> ... do stuff ... >>>>>>>>> } >>>>>>>>> iterator.close(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> So it has no impact on users who don't choose to close() since >>>>>>>>> they iterate with for ( : ) as usual. And a runner that has the 10x >>>>>>>>> funding >>>>>>>>> to try out a ReferenceQueue can be resilient to users that forget. On >>>>>>>>> the >>>>>>>>> other hand, I haven't seen this pattern much in the wild, so I think >>>>>>>>> it is >>>>>>>>> valuable to discuss other methods. >>>>>>>>> >>>>>>>>> While Luke's proposal is something like this if I understand his >>>>>>>>> sketch (replacing WeakReference with PhantomReference seems to be >>>>>>>>> what you >>>>>>>>> really want): >>>>>>>>> >>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>> class RocksDbBagState { >>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>> ReferenceQueue(); >>>>>>>>> >>>>>>>>> class Iterator { >>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>> .next() { >>>>>>>>> return cIter.next(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> class Iterable { >>>>>>>>> .iterator() { >>>>>>>>> return new Iterator(new >>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue)); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> ... on another thread ... >>>>>>>>> while(true) { >>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>> deadRef.close(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> When the iterator is GC'd, the phantom reference will pop onto the >>>>>>>>> queue for being closed. This might not be too bad. You'll have delayed >>>>>>>>> resource release, and potentially masked errors that are hard to >>>>>>>>> debug. It >>>>>>>>> is less error-prone than WeakReference, which is asking for trouble >>>>>>>>> when >>>>>>>>> objects are collected en masse. Anecdotally I have heard that >>>>>>>>> performance >>>>>>>>> of this kind of approach is poor, but I haven't experienced it myself >>>>>>>>> and I >>>>>>>>> can't find good data. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> If I understand correctly, using weak references will help clean >>>>>>>>>> up the Java objects once GC kicks in. In case of kv-store likes >>>>>>>>>> rocksDb, >>>>>>>>>> the Java iterator is just a JNI interface to the underlying C >>>>>>>>>> iterator, so >>>>>>>>>> we need to explicitly invoke close to release the in-memory snapshot >>>>>>>>>> data, >>>>>>>>>> which can be large and accumulated quickly if it's not released when >>>>>>>>>> not in >>>>>>>>>> use. Maybe I am missing something as you suggested here, but looks >>>>>>>>>> to me >>>>>>>>>> using weak references might not help in this case. >>>>>>>>>> >>>>>>>>>> I understand your concern, and I think you might misunderstood >>>>>>>>>> what I meant. I am totally for working hard for best user >>>>>>>>>> experience, and I >>>>>>>>>> think the current API provides a good example of that. That's also >>>>>>>>>> the >>>>>>>>>> reason I am implementing a runner here. I am just proposing an extra >>>>>>>>>> API to >>>>>>>>>> expose an iterator that can be closed when not needed, that way the >>>>>>>>>> users >>>>>>>>>> can use this feature to iterate through large state that doesn't fit >>>>>>>>>> into >>>>>>>>>> memory. I believe this is also a pretty general use case and it's >>>>>>>>>> better to >>>>>>>>>> have support for it. I am actually arguing this will be a better user >>>>>>>>>> experience to add this extra API since more users can benefit from >>>>>>>>>> it. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xinyu >>>>>>>>>> >>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I don't agree. I believe you can track the iterators/iterables >>>>>>>>>>> that are created and freed by using weak references and reference >>>>>>>>>>> queues >>>>>>>>>>> (or other methods). Having a few people work 10x as hard to provide >>>>>>>>>>> a good >>>>>>>>>>> implementation is much better then having 100s or 1000s of users >>>>>>>>>>> suffering >>>>>>>>>>> through a more complicated API. >>>>>>>>>>> >>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Load/evict blocks will help reduce the cache memory footprint, >>>>>>>>>>>> but we still won't be able to release the underlying resources. We >>>>>>>>>>>> can add >>>>>>>>>>>> definitely heuristics to help release the resources as you >>>>>>>>>>>> mentioned, but >>>>>>>>>>>> there is no accurate way to track all the iterators/iterables >>>>>>>>>>>> created and >>>>>>>>>>>> free them up once not needed. I think while the API is aimed at >>>>>>>>>>>> nice user >>>>>>>>>>>> experience, we should have the option to let users optimize their >>>>>>>>>>>> performance if they choose to. Do you agree? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Xinyu >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Users won't reliably close/release the resources and forcing >>>>>>>>>>>>> them to will make the user experience worse. >>>>>>>>>>>>> It will make a lot more sense to use a file format which >>>>>>>>>>>>> allows random access and use a cache to load/evict blocks of the >>>>>>>>>>>>> state from >>>>>>>>>>>>> memory. >>>>>>>>>>>>> If that is not possible, use an iterable which frees the >>>>>>>>>>>>> resource after a certain amount of inactivity or uses weak >>>>>>>>>>>>> references. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm in the middle of implementing the MapState and SetState >>>>>>>>>>>>>> in our Samza runner. We noticed that the state returns the Java >>>>>>>>>>>>>> Iterable >>>>>>>>>>>>>> for reading entries, keys, etc. For state backed by file-based >>>>>>>>>>>>>> kv store >>>>>>>>>>>>>> like rocksDb, we need to be able to let users explicitly close >>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we have to >>>>>>>>>>>>>> load the >>>>>>>>>>>>>> iterable into memory so we can safely close the underlying >>>>>>>>>>>>>> rocksDb >>>>>>>>>>>>>> iterator, similar to Flink's implementation. But this won't >>>>>>>>>>>>>> work for states that don't fit into memory. I chatted with Kenn >>>>>>>>>>>>>> and he also >>>>>>>>>>>>>> agrees we need this capability to avoid bulk read/write. This >>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we can add >>>>>>>>>>>>>> the support >>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any feedback >>>>>>>>>>>>>> is highly >>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>> >>>
