I don't follow why allowing freeing resources would be counter to the spec.
I don't really know what you mean by consistent for a bundle. State, in the
sense of the user-facing per-key-and-window state API, is single threaded
and scoped to a single DoFn. There's no one else who can write the state.
If a BagState is read and written and read again, the user-facing logic
should be unaware of the resources and not have any logic to deal with
consistency.

Kenn

On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <[email protected]> wrote:

> Hmm, some of the problem I'm dealing with is:
> * I don't understand how you would want close to change the semantics of a
> user state specification and how it affects the lifetime of user state?
> ** Does it represent committing information within a bundle?
> ** Does it mean that user state can ignore the replayable and consistent
> semantics for a lifetime of a bundle semantics?
> * I do understand that close semantics may make it easier for Samza runner
> to support state that is greater then memory, but what does it provide to
> users and how would they benefit (like what new scenarios would it support)?
> * I don't understand in RocksDb why you need a snapshot in the first
> place, and whether RocksDb can support seeking inside or outside a snapshot
> relatively efficiently?
>
> There seems to be some context lost initially with a discussion that you
> and Kenn had, is there a copy of that which could be shared? May help me
> get up to speed on to the problem that is being solved.
>
>
> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <[email protected]> wrote:
>
>> I will take a look at the docs to understand the problem better. A minor
>> comment to 2) is that I don't intend to change the existing iterable API. I
>> plan to implement it similar to Flink, loading the data into memory and
>> closing the underlying snapshot after that. So the changes should be
>> backward compatible.
>>
>> Thanks,
>> Xinyu
>>
>>
>>
>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <[email protected]> wrote:
>>
>>> I believe adding support for a state spec to be 'closed' or 'freed' is
>>> counter to the requirement of a state spec being consistent for the
>>> lifetime of a bundle, are we willing to change this requirement for the
>>> lifetime of a bundle or say that runners can arbitrary say that a StateSpec
>>> can't be accessed anymore?
>>>
>>> If not, I'm having trouble of thinking of a good way on how to integrate
>>> the 'close/free' API with portability semantics because:
>>> 1) Runners do control bundle size but other then the trivial cases where
>>> a runner specifically chooses:
>>>   a) to only process a single element at a time (negating the need for a
>>> free/close() method).
>>>   b) optimistically process elements and if you run out of memory kill
>>> the bundle and try again with a smaller bundle.
>>> 2) Adding an API to provide Iterators doesn't mean that users can't
>>> still use iterable which can't be force closed. (Dropping Iterable in
>>> exchange for Iterator in the API is backwards incompatible so likely to be
>>> deferred till the next major version of Apache Beam).
>>> 3) SDKs are able to process as many elements as they want in parallel,
>>> nothing requires them to execute elements serially throughout the pipeline
>>> graph.
>>> 4) The runner has limited information into what an SDK is doing. The SDK
>>> provides a lower bound on how many elements it has processed but SDKs
>>> aren't required to implement this meaning that they could technically be
>>> processing everything in random order after they have seen all input for a
>>> bundle.
>>>
>>> We'll need to work through the scenarios, Xinyu it would be useful for
>>> you to take a look at these two docs for context into the problem space:
>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process a
>>> bundle)
>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to
>>> access side inputs, access remote references, and support user state)
>>>
>>>
>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>>
>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Before you go on and update the user facing API, we should discuss the
>>>>> last point I made since the change your making will have limited usability
>>>>> since the portability effort won't realistically allow you to see such low
>>>>> level things like when processElement finished and supporting user state
>>>>> will be modeled using the following three operations: read (with
>>>>> continuation tokens), append (blind write), clear. It may be moot to
>>>>> discuss how to integrate Samza into the existing framework and should work
>>>>> towards being a portability based runner. There are more details here 
>>>>> about
>>>>> supporting user state during pipeline execution:
>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>> level docs about portability are https://s.apache.org/beam-runner-api
>>>>> and https://s.apache.org/beam-fn-api.
>>>>>
>>>>> Do we really want to make a change to the user facing API for user
>>>>> state in the Java SDK when that code path will be phased out in exchange
>>>>> for using the portability APIs?
>>>>>
>>>>
>>>> To be clear, the user API will remain the same, but the runner's
>>>> implementation path will be phased out. So should we expand this discussion
>>>> to how the portability APIs enable the SDK and runner to collaborate to
>>>> achieve this use case? It seems like the interaction you need is that the
>>>> runner can tell that the SDK can closed the connection on the read(), and
>>>> the SDK needs to do so promptly, right?
>>>>
>>>> Kenn
>>>>
>>>>
>>>>
>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> We discussed internally about the proposed approaches. Seems if the
>>>>>> State API can also expose another method to return a 
>>>>>> ReadableState<Iterator>,
>>>>>> it will cover our cases of iterating over a bigger-then-memory state, and
>>>>>> closing the underlying rocksDb snapshot immediately after the iterator is
>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>> iterators after proessElement (minor cases). If this sounds good to you
>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it.
>>>>>> Thanks a lot for the discussions here.
>>>>>>
>>>>>> The discussions about Async processing is very interesting, and I
>>>>>> think it's definitely worth its own thread. I believe we do need the
>>>>>> support from the Beam API/model so the users can take advantage of it
>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in the
>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying
>>>>>> framework, but it's going to be great for the users to do remote IO. In
>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>> mentioned
>>>>>> above. I am not sure whether this feature can be runner-specific support
>>>>>> thing. I will probably create another email thread for this discussion in
>>>>>> the future and hopefully I can move this forward.
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Users typically want to do that async operation and then produce
>>>>>>>> output with it. Adding asynchronous execution is difficult within the
>>>>>>>> framework because a lot of code is currently not needed to be thread 
>>>>>>>> safe
>>>>>>>> and writing code to be fast and handle asynchronous execution is quite
>>>>>>>> difficult. Adding async operations typically leads to code with too 
>>>>>>>> many
>>>>>>>> locks/synchronization blocks.
>>>>>>>>
>>>>>>>
>>>>>>> Just a small point here, as it is very unusual to use locks with
>>>>>>> futures. The essential innovation of futures is that it is a new way to
>>>>>>> program that avoids threads+locks style. In part, you get this for free
>>>>>>> from functional programming style, and on the other hand you reify
>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>> visibility into such low level things like when an object is garbage
>>>>>>>> collected and supporting user state will be modeled using the following
>>>>>>>> three operations: read (with continuation tokens), append (blind 
>>>>>>>> write),
>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the 
>>>>>>>> existing
>>>>>>>> framework and should work towards being a portability based runner. 
>>>>>>>> There
>>>>>>>> are more details here about supporting user state during pipeline
>>>>>>>> execution:
>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>>>> level docs about portability are
>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>
>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> At least one API that has been discussed in the past, is to use
>>>>>>>>> Java 8 CompletionStage. e.g.
>>>>>>>>>
>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>       element.thenApply(...)
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> The framework will automatically create the CompletionStage, and
>>>>>>>>> the process method can specify a pipeline of asynchronous operations 
>>>>>>>>> to
>>>>>>>>> perform on the element. When all of them are done, the element will be
>>>>>>>>> marked as successfully processed.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for all the pointers. I looked though the discussion over 
>>>>>>>>>> BEAM-2975
>>>>>>>>>> and BEAM-2980 about having snapshot or live views of iterable,
>>>>>>>>>> and the current semantics makes a lot of sense to me.  For your 
>>>>>>>>>> question:
>>>>>>>>>> it does not require an explicit snapshot when we create RocksDb 
>>>>>>>>>> iterator
>>>>>>>>>> directly. The iterator will read from an implicit snapshot as of the 
>>>>>>>>>> time
>>>>>>>>>> the iterator is created [1], and the snapshot will be released after 
>>>>>>>>>> the
>>>>>>>>>> iterator is closed. If we can have another method to return
>>>>>>>>>> ReadableState<Iterator>, we might be able to apply the auto-closing
>>>>>>>>>> approaches as we discussed and solve the problem here :).
>>>>>>>>>>
>>>>>>>>>> It's very interesting that you bring up the discussion about
>>>>>>>>>> async API! Async IO has been widely adopted here among our users: 
>>>>>>>>>> they use
>>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage 
>>>>>>>>>> the
>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which 
>>>>>>>>>> the user
>>>>>>>>>> will invoke the callback after the remote calls are complete. 
>>>>>>>>>> Currently in
>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote 
>>>>>>>>>> IO. Seems
>>>>>>>>>> we might have to do blocking calls (thus the poor resource 
>>>>>>>>>> utilization you
>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can 
>>>>>>>>>> send a
>>>>>>>>>> few more details about the discussion about async API. I would like 
>>>>>>>>>> to add
>>>>>>>>>> our use case and help move this forward.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>> [3]:
>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I don't have any further suggestions, but want to call out how
>>>>>>>>>>> this hits a lot of interesting points.
>>>>>>>>>>>
>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] and
>>>>>>>>>>> BEAM-2980 [2] where we debated things a bit. I think the strongest 
>>>>>>>>>>> case is
>>>>>>>>>>> for what you describe - it should be a snapshot. Perhaps they 
>>>>>>>>>>> should both
>>>>>>>>>>> be closed as fixed...
>>>>>>>>>>>
>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>> compensated with
>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor
>>>>>>>>>>> utilization). The alternative is a more future-istic API where the 
>>>>>>>>>>> calls
>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@
>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>
>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two 
>>>>>>>>>>> methods with
>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But 
>>>>>>>>>>> wouldn't
>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb 
>>>>>>>>>>> iterator
>>>>>>>>>>> require a snapshot to have well-defined contents? As you can tell, 
>>>>>>>>>>> I don't
>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>
>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted
>>>>>>>>>>>> the question here, we discussed internally about releasing the 
>>>>>>>>>>>> underlying
>>>>>>>>>>>> resources after consuming the whole iterator. This probably covers 
>>>>>>>>>>>> quite a
>>>>>>>>>>>> lot of use cases. For some special cases that the user only 
>>>>>>>>>>>> consume part of
>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after
>>>>>>>>>>>> processElement() might work (I need to confirm about this with our 
>>>>>>>>>>>> use
>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a good 
>>>>>>>>>>>> way to
>>>>>>>>>>>> automatically close an iterator for the store.
>>>>>>>>>>>>
>>>>>>>>>>>> There is another issue though: right now the state API returns
>>>>>>>>>>>> an iterable for entries(), keys() and values(), and we can create 
>>>>>>>>>>>> iterator
>>>>>>>>>>>> from it. From my understanding, the iterable holds a snapshot of 
>>>>>>>>>>>> the
>>>>>>>>>>>> underlying store. In case of rocksDb, it's going to be a 
>>>>>>>>>>>> db.snapshot().
>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator 
>>>>>>>>>>>> where we can
>>>>>>>>>>>> use some heuristics to automatically release it. The user can hold 
>>>>>>>>>>>> on to
>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>> processElement().
>>>>>>>>>>>> But if we only close the iterable after processElement(), I am 
>>>>>>>>>>>> quite
>>>>>>>>>>>> concerned about the limitations this will bring. If the user is 
>>>>>>>>>>>> doing some
>>>>>>>>>>>> remote call during the process, then the snapshot might be held 
>>>>>>>>>>>> for a long
>>>>>>>>>>>> time before releasing, and might cause performance problems. And 
>>>>>>>>>>>> if the
>>>>>>>>>>>> user happen to create multiple iterables, then there will be 
>>>>>>>>>>>> multiple
>>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive 
>>>>>>>>>>>> at closing
>>>>>>>>>>>> the resources and recreating when needed again. But in this case 
>>>>>>>>>>>> it might
>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot 
>>>>>>>>>>>> given the
>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is 
>>>>>>>>>>>> not cheap
>>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator 
>>>>>>>>>>>> itself
>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>>>>>>>>
>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in some 
>>>>>>>>>>>> sort of
>>>>>>>>>>>> StateResource in the fear that people might reject the proposal 
>>>>>>>>>>>> immediately
>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are 
>>>>>>>>>>>> familiar
>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>> using it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <[email protected]
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that
>>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom 
>>>>>>>>>>>>> reference
>>>>>>>>>>>>> suggestion).
>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file
>>>>>>>>>>>>> streams) lead to leaked resources.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But
>>>>>>>>>>>>>> then I realized a few things:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources, only its
>>>>>>>>>>>>>> Iterators
>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user would
>>>>>>>>>>>>>> not have to do any extra work
>>>>>>>>>>>>>>  - you can also automatically free it up at the end of the
>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not 
>>>>>>>>>>>>>> want to)
>>>>>>>>>>>>>>  - so the main use is when the iterator terminates early and
>>>>>>>>>>>>>> is not fully consumed and you can't wait to finish the method
>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of uninitialized
>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a lot 
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> boilerplate LOC
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>> iterable could have some transparent caches that all free up 
>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches,
>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach 
>>>>>>>>>>>>>>> looks neat, but
>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will make
>>>>>>>>>>>>>>> users' life hard. If the user happens to configure a large JVM 
>>>>>>>>>>>>>>> heap size,
>>>>>>>>>>>>>>> and since rocksDb uses off-heap memory, GC might start very 
>>>>>>>>>>>>>>> late and less
>>>>>>>>>>>>>>> frequent than what we want. If we don't have a *definitive*
>>>>>>>>>>>>>>> way to let user close the underlying resources, then there is 
>>>>>>>>>>>>>>> no good way
>>>>>>>>>>>>>>> to handle such failures of a critical application in production.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Having a close method in the iterator might be a little
>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we 
>>>>>>>>>>>>>>> are holding
>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() for 
>>>>>>>>>>>>>>> a resource
>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would 
>>>>>>>>>>>>>>> imagine that we
>>>>>>>>>>>>>>> also define a resource for the state iterator and make the 
>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so
>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and let 
>>>>>>>>>>>>>>> me know if
>>>>>>>>>>>>>>> you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not.
>>>>>>>>>>>>>>>> There has been no specific API proposed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be able
>>>>>>>>>>>>>>>> to read and write bigger-than-memory state. It seems we have 
>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. You 
>>>>>>>>>>>>>>>> might be
>>>>>>>>>>>>>>>> able to build something good enough with phantom references, 
>>>>>>>>>>>>>>>> but you might
>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If I understand the idea, it might look something like this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>        public void proc(@StateId("foo") BagState<Whatever>
>>>>>>>>>>>>>>>> myBag, ...) {
>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special condition
>>>>>>>>>>>>>>>> ...) {
>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close()
>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner that 
>>>>>>>>>>>>>>>> has the 10x
>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to users 
>>>>>>>>>>>>>>>> that forget.
>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the 
>>>>>>>>>>>>>>>> wild, so I think
>>>>>>>>>>>>>>>> it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>>>>>>>>> ReferenceQueue();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop
>>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. 
>>>>>>>>>>>>>>>> You'll have
>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that 
>>>>>>>>>>>>>>>> are hard to
>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is 
>>>>>>>>>>>>>>>> asking for
>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I 
>>>>>>>>>>>>>>>> have heard that
>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't 
>>>>>>>>>>>>>>>> experienced it
>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will help
>>>>>>>>>>>>>>>>> clean up the Java objects once GC kicks in. In case of 
>>>>>>>>>>>>>>>>> kv-store likes
>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the 
>>>>>>>>>>>>>>>>> underlying C
>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release 
>>>>>>>>>>>>>>>>> the in-memory
>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly if 
>>>>>>>>>>>>>>>>> it's not
>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as you 
>>>>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help in 
>>>>>>>>>>>>>>>>> this case.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard for 
>>>>>>>>>>>>>>>>> best user
>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good 
>>>>>>>>>>>>>>>>> example of that.
>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I am 
>>>>>>>>>>>>>>>>> just proposing
>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when 
>>>>>>>>>>>>>>>>> not needed, that
>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large 
>>>>>>>>>>>>>>>>> state that
>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty 
>>>>>>>>>>>>>>>>> general use case
>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually arguing 
>>>>>>>>>>>>>>>>> this will be
>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more 
>>>>>>>>>>>>>>>>> users can benefit
>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using weak 
>>>>>>>>>>>>>>>>>> references and
>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people 
>>>>>>>>>>>>>>>>>> work 10x as hard
>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then having 
>>>>>>>>>>>>>>>>>> 100s or 1000s
>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory
>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the 
>>>>>>>>>>>>>>>>>>> underlying resources.
>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the 
>>>>>>>>>>>>>>>>>>> resources as you
>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the
>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not 
>>>>>>>>>>>>>>>>>>> needed. I think while
>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have 
>>>>>>>>>>>>>>>>>>> the option to let
>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do you 
>>>>>>>>>>>>>>>>>>> agree?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and
>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse.
>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format
>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict 
>>>>>>>>>>>>>>>>>>>> blocks of the
>>>>>>>>>>>>>>>>>>>> state from memory.
>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees
>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or uses 
>>>>>>>>>>>>>>>>>>>> weak references.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and
>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state 
>>>>>>>>>>>>>>>>>>>>> returns the Java
>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state backed 
>>>>>>>>>>>>>>>>>>>>> by file-based kv
>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users 
>>>>>>>>>>>>>>>>>>>>> explicitly close
>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we 
>>>>>>>>>>>>>>>>>>>>> have to load the
>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the 
>>>>>>>>>>>>>>>>>>>>> underlying rocksDb
>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But this
>>>>>>>>>>>>>>>>>>>>> won't work for states that don't fit into memory. I 
>>>>>>>>>>>>>>>>>>>>> chatted with Kenn and
>>>>>>>>>>>>>>>>>>>>> he also agrees we need this capability to avoid bulk 
>>>>>>>>>>>>>>>>>>>>> read/write. This
>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we 
>>>>>>>>>>>>>>>>>>>>> can add the support
>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any 
>>>>>>>>>>>>>>>>>>>>> feedback is highly
>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>

Reply via email to