I misspoke when I said portability semantics and should have said
portability design/implementation. This is why I had a follow-up e-mail and
clarified that I'm confused on:
* I don't understand how you would want close to change the semantics of a
user state specification and how it affects the lifetime of user state?
** Does it represent committing information within a bundle?
** Does it mean that user state can ignore the replayable and consistent
semantics for a lifetime of a bundle semantics?

I'm trying to tie back what does a `ReadableState<Iterator> readIterator()`
means for Runner authors and how it solves the memory/close() problem for
Samza. Based upon https://s.apache.org/beam-state, reading and writing of
state must be consistent. Does this mean that Samza must use snapshots for
the lifetime of a bundle? If so, I don't see how adding a
`ReadableState<Iterator> readIterator()` allows Samza to ignore the
consistency requirement and be allowed to free snapshots.

It might be worthwhile to setup a three way hangouts call to help me as I
don't have the same level of context which can be shared back to this
thread. Xinyu / Kenn, how about we setup a time using Slack?

On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <[email protected]> wrote:

> I feel like this discussion is kind of far from the primary intention. The
> point of ParDo(stateful DoFn) is to enable naive single-threaded code in a
> style intuitive to a beginning imperative programmer. So:
>
>  - the return value of read() should act like an immutable value
>  - if there is a read after a write, that read should reflect the changes
> written, because there's a trivial happens-before relationship induced by
> program order
>  - there are no non-trivial happens-before relationships
>  - a write after a read should not affect the value read before
>
> From that starting point, the limitations on expressiveness (single
> threaded per-key-and-window-and-step) give rise to embarrassingly parallel
> computation and GC.
>
> So when you say "portability semantics" it makes me concerned. The word
> "semantics" refers to the mapping between what a user writes and what it
> means. Primarily, that means the conceptual translation between a primitive
> PTransform and the corresponding PCollection-to-PCollection operation. It
> is true that portability complicates things because a user's code can only
> be given meaning relative to an SDK harness. But the end-user intention is
> unchanged. If we had time and resources to give the Fn API a solid spec, it
> should be such that the SDK harness has little choice but to implement the
> primitives as intended.
>
> In other words, it is the job of
> https://s.apache.org/beam-fn-state-api-and-bundle-processing* to
> implement https://s.apache.org/beam-state. The discussion of adding
> `ReadableState<Iterator> readIterator()` to BagState seems consistent with
> the latter.
>
> Kenn
>
> *IIUC you've chosen to use the same underlying proto service for side
> inputs and by-reference values. That's an implementation detail that I have
> no particular opinion about except that if it complicates implementing the
> primary motivator for state, it may not be a great fit.
>
>
>
> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <[email protected]> wrote:
>
>> I don't follow why allowing freeing resources would be counter to the
>> spec. I don't really know what you mean by consistent for a bundle. State,
>> in the sense of the user-facing per-key-and-window state API, is single
>> threaded and scoped to a single DoFn. There's no one else who can write the
>> state. If a BagState is read and written and read again, the user-facing
>> logic should be unaware of the resources and not have any logic to deal
>> with consistency.
>>
>> Kenn
>>
>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> Hmm, some of the problem I'm dealing with is:
>>> * I don't understand how you would want close to change the semantics of
>>> a user state specification and how it affects the lifetime of user state?
>>> ** Does it represent committing information within a bundle?
>>> ** Does it mean that user state can ignore the replayable and consistent
>>> semantics for a lifetime of a bundle semantics?
>>> * I do understand that close semantics may make it easier for Samza
>>> runner to support state that is greater then memory, but what does it
>>> provide to users and how would they benefit (like what new scenarios would
>>> it support)?
>>> * I don't understand in RocksDb why you need a snapshot in the first
>>> place, and whether RocksDb can support seeking inside or outside a snapshot
>>> relatively efficiently?
>>>
>>> There seems to be some context lost initially with a discussion that you
>>> and Kenn had, is there a copy of that which could be shared? May help me
>>> get up to speed on to the problem that is being solved.
>>>
>>>
>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <[email protected]> wrote:
>>>
>>>> I will take a look at the docs to understand the problem better. A
>>>> minor comment to 2) is that I don't intend to change the existing iterable
>>>> API. I plan to implement it similar to Flink, loading the data into memory
>>>> and closing the underlying snapshot after that. So the changes should be
>>>> backward compatible.
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>>
>>>>
>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> I believe adding support for a state spec to be 'closed' or 'freed' is
>>>>> counter to the requirement of a state spec being consistent for the
>>>>> lifetime of a bundle, are we willing to change this requirement for the
>>>>> lifetime of a bundle or say that runners can arbitrary say that a 
>>>>> StateSpec
>>>>> can't be accessed anymore?
>>>>>
>>>>> If not, I'm having trouble of thinking of a good way on how to
>>>>> integrate the 'close/free' API with portability semantics because:
>>>>> 1) Runners do control bundle size but other then the trivial cases
>>>>> where a runner specifically chooses:
>>>>>   a) to only process a single element at a time (negating the need for
>>>>> a free/close() method).
>>>>>   b) optimistically process elements and if you run out of memory kill
>>>>> the bundle and try again with a smaller bundle.
>>>>> 2) Adding an API to provide Iterators doesn't mean that users can't
>>>>> still use iterable which can't be force closed. (Dropping Iterable in
>>>>> exchange for Iterator in the API is backwards incompatible so likely to be
>>>>> deferred till the next major version of Apache Beam).
>>>>> 3) SDKs are able to process as many elements as they want in parallel,
>>>>> nothing requires them to execute elements serially throughout the pipeline
>>>>> graph.
>>>>> 4) The runner has limited information into what an SDK is doing. The
>>>>> SDK provides a lower bound on how many elements it has processed but SDKs
>>>>> aren't required to implement this meaning that they could technically be
>>>>> processing everything in random order after they have seen all input for a
>>>>> bundle.
>>>>>
>>>>> We'll need to work through the scenarios, Xinyu it would be useful for
>>>>> you to take a look at these two docs for context into the problem space:
>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process
>>>>> a bundle)
>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to
>>>>> access side inputs, access remote references, and support user state)
>>>>>
>>>>>
>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> Before you go on and update the user facing API, we should discuss
>>>>>>> the last point I made since the change your making will have limited
>>>>>>> usability since the portability effort won't realistically allow you to 
>>>>>>> see
>>>>>>> such low level things like when processElement finished and supporting 
>>>>>>> user
>>>>>>> state will be modeled using the following three operations: read (with
>>>>>>> continuation tokens), append (blind write), clear. It may be moot to
>>>>>>> discuss how to integrate Samza into the existing framework and should 
>>>>>>> work
>>>>>>> towards being a portability based runner. There are more details here 
>>>>>>> about
>>>>>>> supporting user state during pipeline execution:
>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>>> level docs about portability are
>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>
>>>>>>> Do we really want to make a change to the user facing API for user
>>>>>>> state in the Java SDK when that code path will be phased out in exchange
>>>>>>> for using the portability APIs?
>>>>>>>
>>>>>>
>>>>>> To be clear, the user API will remain the same, but the runner's
>>>>>> implementation path will be phased out. So should we expand this 
>>>>>> discussion
>>>>>> to how the portability APIs enable the SDK and runner to collaborate to
>>>>>> achieve this use case? It seems like the interaction you need is that the
>>>>>> runner can tell that the SDK can closed the connection on the read(), and
>>>>>> the SDK needs to do so promptly, right?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>
>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We discussed internally about the proposed approaches. Seems if the
>>>>>>>> State API can also expose another method to return a 
>>>>>>>> ReadableState<Iterator>,
>>>>>>>> it will cover our cases of iterating over a bigger-then-memory state, 
>>>>>>>> and
>>>>>>>> closing the underlying rocksDb snapshot immediately after the iterator 
>>>>>>>> is
>>>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>>>> iterators after proessElement (minor cases). If this sounds good to you
>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it.
>>>>>>>> Thanks a lot for the discussions here.
>>>>>>>>
>>>>>>>> The discussions about Async processing is very interesting, and I
>>>>>>>> think it's definitely worth its own thread. I believe we do need the
>>>>>>>> support from the Beam API/model so the users can take advantage of it
>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in 
>>>>>>>> the
>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying
>>>>>>>> framework, but it's going to be great for the users to do remote IO. In
>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>>>> mentioned
>>>>>>>> above. I am not sure whether this feature can be runner-specific 
>>>>>>>> support
>>>>>>>> thing. I will probably create another email thread for this discussion 
>>>>>>>> in
>>>>>>>> the future and hopefully I can move this forward.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Users typically want to do that async operation and then produce
>>>>>>>>>> output with it. Adding asynchronous execution is difficult within the
>>>>>>>>>> framework because a lot of code is currently not needed to be thread 
>>>>>>>>>> safe
>>>>>>>>>> and writing code to be fast and handle asynchronous execution is 
>>>>>>>>>> quite
>>>>>>>>>> difficult. Adding async operations typically leads to code with too 
>>>>>>>>>> many
>>>>>>>>>> locks/synchronization blocks.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Just a small point here, as it is very unusual to use locks with
>>>>>>>>> futures. The essential innovation of futures is that it is a new way 
>>>>>>>>> to
>>>>>>>>> program that avoids threads+locks style. In part, you get this for 
>>>>>>>>> free
>>>>>>>>> from functional programming style, and on the other hand you reify
>>>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>>>> visibility into such low level things like when an object is garbage
>>>>>>>>>> collected and supporting user state will be modeled using the 
>>>>>>>>>> following
>>>>>>>>>> three operations: read (with continuation tokens), append (blind 
>>>>>>>>>> write),
>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the 
>>>>>>>>>> existing
>>>>>>>>>> framework and should work towards being a portability based runner. 
>>>>>>>>>> There
>>>>>>>>>> are more details here about supporting user state during pipeline
>>>>>>>>>> execution:
>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing,
>>>>>>>>>> top level docs about portability are
>>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>>
>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> At least one API that has been discussed in the past, is to use
>>>>>>>>>>> Java 8 CompletionStage. e.g.
>>>>>>>>>>>
>>>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>> element, ...) {
>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> The framework will automatically create the CompletionStage, and
>>>>>>>>>>> the process method can specify a pipeline of asynchronous 
>>>>>>>>>>> operations to
>>>>>>>>>>> perform on the element. When all of them are done, the element will 
>>>>>>>>>>> be
>>>>>>>>>>> marked as successfully processed.
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion
>>>>>>>>>>>> over BEAM-2975 and BEAM-2980 about having snapshot or live
>>>>>>>>>>>> views of iterable, and the current semantics makes a lot of sense 
>>>>>>>>>>>> to me.
>>>>>>>>>>>> For your question: it does not require an explicit snapshot when 
>>>>>>>>>>>> we create
>>>>>>>>>>>> RocksDb iterator directly. The iterator will read from an implicit 
>>>>>>>>>>>> snapshot
>>>>>>>>>>>> as of the time the iterator is created [1], and the snapshot will 
>>>>>>>>>>>> be
>>>>>>>>>>>> released after the iterator is closed. If we can have another 
>>>>>>>>>>>> method to
>>>>>>>>>>>> return ReadableState<Iterator>, we might be able to apply the 
>>>>>>>>>>>> auto-closing
>>>>>>>>>>>> approaches as we discussed and solve the problem here :).
>>>>>>>>>>>>
>>>>>>>>>>>> It's very interesting that you bring up the discussion about
>>>>>>>>>>>> async API! Async IO has been widely adopted here among our users: 
>>>>>>>>>>>> they use
>>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage 
>>>>>>>>>>>> the
>>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which 
>>>>>>>>>>>> the user
>>>>>>>>>>>> will invoke the callback after the remote calls are complete. 
>>>>>>>>>>>> Currently in
>>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote 
>>>>>>>>>>>> IO. Seems
>>>>>>>>>>>> we might have to do blocking calls (thus the poor resource 
>>>>>>>>>>>> utilization you
>>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can 
>>>>>>>>>>>> send a
>>>>>>>>>>>> few more details about the discussion about async API. I would 
>>>>>>>>>>>> like to add
>>>>>>>>>>>> our use case and help move this forward.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>
>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>>>> [3]:
>>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't have any further suggestions, but want to call out how
>>>>>>>>>>>>> this hits a lot of interesting points.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1]
>>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the 
>>>>>>>>>>>>> strongest case
>>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they 
>>>>>>>>>>>>> should
>>>>>>>>>>>>> both be closed as fixed...
>>>>>>>>>>>>>
>>>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>>>> compensated with
>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor
>>>>>>>>>>>>> utilization). The alternative is a more future-istic API where 
>>>>>>>>>>>>> the calls
>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@
>>>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two 
>>>>>>>>>>>>> methods with
>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But 
>>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb 
>>>>>>>>>>>>> iterator
>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can 
>>>>>>>>>>>>> tell, I don't
>>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted
>>>>>>>>>>>>>> the question here, we discussed internally about releasing the 
>>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>> resources after consuming the whole iterator. This probably 
>>>>>>>>>>>>>> covers quite a
>>>>>>>>>>>>>> lot of use cases. For some special cases that the user only 
>>>>>>>>>>>>>> consume part of
>>>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after
>>>>>>>>>>>>>> processElement() might work (I need to confirm about this with 
>>>>>>>>>>>>>> our use
>>>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a 
>>>>>>>>>>>>>> good way to
>>>>>>>>>>>>>> automatically close an iterator for the store.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There is another issue though: right now the state API
>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and we 
>>>>>>>>>>>>>> can create
>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds a 
>>>>>>>>>>>>>> snapshot of
>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be a 
>>>>>>>>>>>>>> db.snapshot().
>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator 
>>>>>>>>>>>>>> where we can
>>>>>>>>>>>>>> use some heuristics to automatically release it. The user can 
>>>>>>>>>>>>>> hold on to
>>>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>>>> processElement().
>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am 
>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>> concerned about the limitations this will bring. If the user is 
>>>>>>>>>>>>>> doing some
>>>>>>>>>>>>>> remote call during the process, then the snapshot might be held 
>>>>>>>>>>>>>> for a long
>>>>>>>>>>>>>> time before releasing, and might cause performance problems. And 
>>>>>>>>>>>>>> if the
>>>>>>>>>>>>>> user happen to create multiple iterables, then there will be 
>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive 
>>>>>>>>>>>>>> at closing
>>>>>>>>>>>>>> the resources and recreating when needed again. But in this case 
>>>>>>>>>>>>>> it might
>>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot 
>>>>>>>>>>>>>> given the
>>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is 
>>>>>>>>>>>>>> not cheap
>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator 
>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in 
>>>>>>>>>>>>>> some sort of
>>>>>>>>>>>>>> StateResource in the fear that people might reject the proposal 
>>>>>>>>>>>>>> immediately
>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are 
>>>>>>>>>>>>>> familiar
>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>>>> using it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that
>>>>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom 
>>>>>>>>>>>>>>> reference
>>>>>>>>>>>>>>> suggestion).
>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file
>>>>>>>>>>>>>>> streams) lead to leaked resources.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But
>>>>>>>>>>>>>>>> then I realized a few things:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources, only
>>>>>>>>>>>>>>>> its Iterators
>>>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user would
>>>>>>>>>>>>>>>> not have to do any extra work
>>>>>>>>>>>>>>>>  - you can also automatically free it up at the end of the
>>>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not 
>>>>>>>>>>>>>>>> want to)
>>>>>>>>>>>>>>>>  - so the main use is when the iterator terminates early
>>>>>>>>>>>>>>>> and is not fully consumed and you can't wait to finish the 
>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of uninitialized
>>>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a 
>>>>>>>>>>>>>>>> lot of
>>>>>>>>>>>>>>>> boilerplate LOC
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up 
>>>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches,
>>>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach 
>>>>>>>>>>>>>>>>> looks neat, but
>>>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will
>>>>>>>>>>>>>>>>> make users' life hard. If the user happens to configure a 
>>>>>>>>>>>>>>>>> large JVM heap
>>>>>>>>>>>>>>>>> size, and since rocksDb uses off-heap memory, GC might start 
>>>>>>>>>>>>>>>>> very late and
>>>>>>>>>>>>>>>>> less frequent than what we want. If we don't have a
>>>>>>>>>>>>>>>>> *definitive* way to let user close the underlying
>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such failures 
>>>>>>>>>>>>>>>>> of a critical
>>>>>>>>>>>>>>>>> application in production.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little
>>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we 
>>>>>>>>>>>>>>>>> are holding
>>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() 
>>>>>>>>>>>>>>>>> for a resource
>>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would 
>>>>>>>>>>>>>>>>> imagine that we
>>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the 
>>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so
>>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and 
>>>>>>>>>>>>>>>>> let me know if
>>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not.
>>>>>>>>>>>>>>>>>> There has been no specific API proposed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be
>>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems we 
>>>>>>>>>>>>>>>>>> have multiple
>>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. 
>>>>>>>>>>>>>>>>>> You might be
>>>>>>>>>>>>>>>>>> able to build something good enough with phantom references, 
>>>>>>>>>>>>>>>>>> but you might
>>>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like
>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>>>        public void proc(@StateId("foo")
>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) {
>>>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special
>>>>>>>>>>>>>>>>>> condition ...) {
>>>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close()
>>>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner 
>>>>>>>>>>>>>>>>>> that has the 10x
>>>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to 
>>>>>>>>>>>>>>>>>> users that forget.
>>>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the 
>>>>>>>>>>>>>>>>>> wild, so I think
>>>>>>>>>>>>>>>>>> it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>>>>>>>>>>> ReferenceQueue();
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, 
>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop
>>>>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. 
>>>>>>>>>>>>>>>>>> You'll have
>>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that 
>>>>>>>>>>>>>>>>>> are hard to
>>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is 
>>>>>>>>>>>>>>>>>> asking for
>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I 
>>>>>>>>>>>>>>>>>> have heard that
>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't 
>>>>>>>>>>>>>>>>>> experienced it
>>>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will
>>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case of 
>>>>>>>>>>>>>>>>>>> kv-store likes
>>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the 
>>>>>>>>>>>>>>>>>>> underlying C
>>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release 
>>>>>>>>>>>>>>>>>>> the in-memory
>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly 
>>>>>>>>>>>>>>>>>>> if it's not
>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as 
>>>>>>>>>>>>>>>>>>> you suggested
>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help 
>>>>>>>>>>>>>>>>>>> in this case.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard 
>>>>>>>>>>>>>>>>>>> for best user
>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good 
>>>>>>>>>>>>>>>>>>> example of that.
>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I 
>>>>>>>>>>>>>>>>>>> am just proposing
>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when 
>>>>>>>>>>>>>>>>>>> not needed, that
>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large 
>>>>>>>>>>>>>>>>>>> state that
>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty 
>>>>>>>>>>>>>>>>>>> general use case
>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually 
>>>>>>>>>>>>>>>>>>> arguing this will be
>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more 
>>>>>>>>>>>>>>>>>>> users can benefit
>>>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using 
>>>>>>>>>>>>>>>>>>>> weak references and
>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people 
>>>>>>>>>>>>>>>>>>>> work 10x as hard
>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then 
>>>>>>>>>>>>>>>>>>>> having 100s or 1000s
>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory
>>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the 
>>>>>>>>>>>>>>>>>>>>> underlying resources.
>>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the 
>>>>>>>>>>>>>>>>>>>>> resources as you
>>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the
>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not 
>>>>>>>>>>>>>>>>>>>>> needed. I think while
>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have 
>>>>>>>>>>>>>>>>>>>>> the option to let
>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do 
>>>>>>>>>>>>>>>>>>>>> you agree?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and
>>>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse.
>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format
>>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict 
>>>>>>>>>>>>>>>>>>>>>> blocks of the
>>>>>>>>>>>>>>>>>>>>>> state from memory.
>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees
>>>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or 
>>>>>>>>>>>>>>>>>>>>>> uses weak references.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and
>>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state 
>>>>>>>>>>>>>>>>>>>>>>> returns the Java
>>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state 
>>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv
>>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users 
>>>>>>>>>>>>>>>>>>>>>>> explicitly close
>>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we 
>>>>>>>>>>>>>>>>>>>>>>> have to load the
>>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the 
>>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb
>>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But
>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. 
>>>>>>>>>>>>>>>>>>>>>>> I chatted with Kenn
>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid 
>>>>>>>>>>>>>>>>>>>>>>> bulk read/write. This
>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we 
>>>>>>>>>>>>>>>>>>>>>>> can add the support
>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any 
>>>>>>>>>>>>>>>>>>>>>>> feedback is highly
>>>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>

Reply via email to