On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <[email protected]> wrote:

> Users typically want to do that async operation and then produce output
> with it. Adding asynchronous execution is difficult within the framework
> because a lot of code is currently not needed to be thread safe and writing
> code to be fast and handle asynchronous execution is quite difficult.
> Adding async operations typically leads to code with too many
> locks/synchronization blocks.
>

Just a small point here, as it is very unusual to use locks with futures.
The essential innovation of futures is that it is a new way to program that
avoids threads+locks style. In part, you get this for free from functional
programming style, and on the other hand you reify asynchronous side
effects as data dependencies.

Kenn



> Note that with the portability effort, the Runner won't have visibility
> into such low level things like when an object is garbage collected and
> supporting user state will be modeled using the following three operations:
> read (with continuation tokens), append (blind write), clear. It may be
> moot to discuss how to integrate Samza into the existing framework and
> should work towards being a portability based runner. There are more
> details here about supporting user state during pipeline execution:
> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top level
> docs about portability are https://s.apache.org/beam-runner-api and
> https://s.apache.org/beam-fn-api.
>
> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <[email protected]> wrote:
>
>> At least one API that has been discussed in the past, is to use Java 8
>> CompletionStage. e.g.
>>
>>  new DoFn<InputT, OutputT>() {
>>     @ProcessElement
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>       element.thenApply(...)
>>     }
>>   }
>>
>> The framework will automatically create the CompletionStage, and the
>> process method can specify a pipeline of asynchronous operations to perform
>> on the element. When all of them are done, the element will be marked as
>> successfully processed.
>>
>> Reuven
>>
>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <[email protected]> wrote:
>>
>>> Thanks for all the pointers. I looked though the discussion over BEAM-2975
>>> and BEAM-2980 about having snapshot or live views of iterable, and the
>>> current semantics makes a lot of sense to me.  For your question: it does
>>> not require an explicit snapshot when we create RocksDb iterator directly.
>>> The iterator will read from an implicit snapshot as of the time the
>>> iterator is created [1], and the snapshot will be released after the
>>> iterator is closed. If we can have another method to return
>>> ReadableState<Iterator>, we might be able to apply the auto-closing
>>> approaches as we discussed and solve the problem here :).
>>>
>>> It's very interesting that you bring up the discussion about async API!
>>> Async IO has been widely adopted here among our users: they use netty for
>>> async calls with library named ParSeq [2] to help manage the calls. Samza
>>> provides a primitive callback style API [3], in which the user will invoke
>>> the callback after the remote calls are complete. Currently in a Samza job
>>> our users use this API with the ParSeq lib for remote IO. Seems we might
>>> have to do blocking calls (thus the poor resource utilization you
>>> mentioned) when using Beam API for now. It'll be great if you can send a
>>> few more details about the discussion about async API. I would like to add
>>> our use case and help move this forward.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>> [2]: https://github.com/linkedin/parseq
>>> [3]:
>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>
>>>
>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <[email protected]> wrote:
>>>
>>>> I don't have any further suggestions, but want to call out how this
>>>> hits a lot of interesting points.
>>>>
>>>> The point about snapshotting is great. We have BEAM-2975 [1] and
>>>> BEAM-2980 [2] where we debated things a bit. I think the strongest case is
>>>> for what you describe - it should be a snapshot. Perhaps they should both
>>>> be closed as fixed...
>>>>
>>>> And you also bring up long blocking calls - we have also deliberately
>>>> decided that long synchronous blocking calls in @ProcessElement can be
>>>> embraced for simple programming and compensated with autoscaling smarts
>>>> (e.g. expand the thread pool by noticing poor utilization). The alternative
>>>> is a more future-istic API where the calls can be explicitly asynchronous.
>>>> We've had some interesting dev@ list discussions about that, too.
>>>>
>>>> Is another possibility to perhaps have read() return a
>>>> ReadableState<Iterator> instead? We could, of course, have two methods with
>>>> different names, one for iterator one for snapshot iterable. But wouldn't
>>>> the Iterator also require a snapshot? Doesn't a native RocksDb iterator
>>>> require a snapshot to have well-defined contents? As you can tell, I don't
>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>
>>>> Kenn
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>
>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the
>>>>> question here, we discussed internally about releasing the underlying
>>>>> resources after consuming the whole iterator. This probably covers quite a
>>>>> lot of use cases. For some special cases that the user only consume part 
>>>>> of
>>>>> the iterator, Luke and Kenn's suggestion about releasing after
>>>>> processElement() might work (I need to confirm about this with our use
>>>>> cases). So based on what we discussed so far, we might have a good way to
>>>>> automatically close an iterator for the store.
>>>>>
>>>>> There is another issue though: right now the state API returns an
>>>>> iterable for entries(), keys() and values(), and we can create iterator
>>>>> from it. From my understanding, the iterable holds a snapshot of the
>>>>> underlying store. In case of rocksDb, it's going to be a db.snapshot().
>>>>> Then when can we release the snapshot? It's not like iterator where we can
>>>>> use some heuristics to automatically release it. The user can hold on to
>>>>> the iterable and create iterators throughout the whole processElement().
>>>>> But if we only close the iterable after processElement(), I am quite
>>>>> concerned about the limitations this will bring. If the user is doing some
>>>>> remote call during the process, then the snapshot might be held for a long
>>>>> time before releasing, and might cause performance problems. And if the
>>>>> user happen to create multiple iterables, then there will be multiple
>>>>> snapshots loaded during process. Luke suggested being aggressive at 
>>>>> closing
>>>>> the resources and recreating when needed again. But in this case it might
>>>>> not work since we won't be able to recreate the same snapshot given the
>>>>> store might have been updated (and creating rocksDb snapshot is not cheap
>>>>> too). I am running out of ideas other than exposing the iterator itself
>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>
>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>> (CloseableIterator) in an internal interface. I wrapped it in some sort of
>>>>> StateResource in the fear that people might reject the proposal 
>>>>> immediately
>>>>> after seeing the close() on the iterator. I guess our users are familiar
>>>>> with rocksDb state, it's pretty normal to close the iterator/snapshot 
>>>>> after
>>>>> using it.
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> The iterator going out of scope is the idiomatic way that resources
>>>>>> are freed for Java developers (hence the weak/phantom reference 
>>>>>> suggestion).
>>>>>> Explicitly requiring users to deal with 'handles' (like file streams)
>>>>>> lead to leaked resources.
>>>>>>
>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Xinyu,
>>>>>>>
>>>>>>> I actually had first sketched out just what you wrote. But then I
>>>>>>> realized a few things:
>>>>>>>
>>>>>>>  - usually an Iterable does not allocate resources, only its
>>>>>>> Iterators
>>>>>>>  - if you consume the whole iterator, I hope the user would not have
>>>>>>> to do any extra work
>>>>>>>  - you can also automatically free it up at the end of the call to
>>>>>>> @ProcessElement so that is easy too (but you might not want to)
>>>>>>>  - so the main use is when the iterator terminates early and is not
>>>>>>> fully consumed and you can't wait to finish the method
>>>>>>>  - the scoping in Java will force a bunch of uninitialized
>>>>>>> declarations outside the try-with-resources block, kind of a lot of
>>>>>>> boilerplate LOC
>>>>>>>
>>>>>>> One thing that is good about your proposal is that the iterable
>>>>>>> could have some transparent caches that all free up together.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for drafting the details about the two approaches, Kenn. Now
>>>>>>>> I understand Luke's proposal better. The approach looks neat, but the
>>>>>>>> uncertainty of *when* GC is going to kick in will make users' life
>>>>>>>> hard. If the user happens to configure a large JVM heap size, and since
>>>>>>>> rocksDb uses off-heap memory, GC might start very late and less 
>>>>>>>> frequent
>>>>>>>> than what we want. If we don't have a *definitive* way to let user
>>>>>>>> close the underlying resources, then there is no good way to handle 
>>>>>>>> such
>>>>>>>> failures of a critical application in production.
>>>>>>>>
>>>>>>>> Having a close method in the iterator might be a little unorthodox
>>>>>>>> :). To some degree, this is actually a resource we are holding 
>>>>>>>> underneath,
>>>>>>>> and I think it's pretty common to have close() for a resource in Java, 
>>>>>>>> e.g.
>>>>>>>> BufferedReader and BufferedWriter. So I would imagine that we also 
>>>>>>>> define a
>>>>>>>> resource for the state iterator and make the interface implements
>>>>>>>> *AutoCloseable*. Here is my sketch:
>>>>>>>>
>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>     while (iter.hasNext() {
>>>>>>>>        .. do stuff ...
>>>>>>>>     }
>>>>>>>> } catch (Exception e) {
>>>>>>>>     ... user code
>>>>>>>> }
>>>>>>>>
>>>>>>>> The type/method name are just for illustrating here, so please
>>>>>>>> don't laugh at them. Please feel free to comment and let me know if you
>>>>>>>> have thoughts about the programming patterns here.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> It is too soon to argue whether an API is complex or not. There
>>>>>>>>> has been no specific API proposed.
>>>>>>>>>
>>>>>>>>> I think the problem statement is real - you need to be able to
>>>>>>>>> read and write bigger-than-memory state. It seems we have multiple 
>>>>>>>>> runners
>>>>>>>>> that don't support it, perhaps because of our API. You might be able 
>>>>>>>>> to
>>>>>>>>> build something good enough with phantom references, but you might 
>>>>>>>>> not.
>>>>>>>>>
>>>>>>>>> If I understand the idea, it might look something like this:
>>>>>>>>>
>>>>>>>>>     new DoFn<>() {
>>>>>>>>>        @StateId("foo")
>>>>>>>>>        private final StateSpec<BagState<Whatever>> myBagSpec = ...
>>>>>>>>>
>>>>>>>>>        @ProcessElement
>>>>>>>>>        public void proc(@StateId("foo") BagState<Whatever> myBag,
>>>>>>>>> ...) {
>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>> myBag.get().iterator();
>>>>>>>>>          while(iterator.hasNext() && ... special condition ...) {
>>>>>>>>>            ... do stuff ...
>>>>>>>>>          }
>>>>>>>>>          iterator.close();
>>>>>>>>>        }
>>>>>>>>>      }
>>>>>>>>>
>>>>>>>>> So it has no impact on users who don't choose to close() since
>>>>>>>>> they iterate with for ( : ) as usual. And a runner that has the 10x 
>>>>>>>>> funding
>>>>>>>>> to try out a ReferenceQueue can be resilient to users that forget. On 
>>>>>>>>> the
>>>>>>>>> other hand, I haven't seen this pattern much in the wild, so I think 
>>>>>>>>> it is
>>>>>>>>> valuable to discuss other methods.
>>>>>>>>>
>>>>>>>>> While Luke's proposal is something like this if I understand his
>>>>>>>>> sketch (replacing WeakReference with PhantomReference seems to be 
>>>>>>>>> what you
>>>>>>>>> really want):
>>>>>>>>>
>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>> ReferenceQueue();
>>>>>>>>>
>>>>>>>>>       class Iterator {
>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>          .next() {
>>>>>>>>>            return cIter.next();
>>>>>>>>>          }
>>>>>>>>>       }
>>>>>>>>>
>>>>>>>>>      class Iterable {
>>>>>>>>>         .iterator() {
>>>>>>>>>           return new Iterator(new
>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue));
>>>>>>>>>         }
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     ... on another thread ...
>>>>>>>>>     while(true) {
>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>       deadRef.close();
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> When the iterator is GC'd, the phantom reference will pop onto the
>>>>>>>>> queue for being closed. This might not be too bad. You'll have delayed
>>>>>>>>> resource release, and potentially masked errors that are hard to 
>>>>>>>>> debug. It
>>>>>>>>> is less error-prone than WeakReference, which is asking for trouble 
>>>>>>>>> when
>>>>>>>>> objects are collected en masse. Anecdotally I have heard that 
>>>>>>>>> performance
>>>>>>>>> of this kind of approach is poor, but I haven't experienced it myself 
>>>>>>>>> and I
>>>>>>>>> can't find good data.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> If I understand correctly, using weak references will help clean
>>>>>>>>>> up the Java objects once GC kicks in. In case of kv-store likes 
>>>>>>>>>> rocksDb,
>>>>>>>>>> the Java iterator is just a JNI interface to the underlying C 
>>>>>>>>>> iterator, so
>>>>>>>>>> we need to explicitly invoke close to release the in-memory snapshot 
>>>>>>>>>> data,
>>>>>>>>>> which can be large and accumulated quickly if it's not released when 
>>>>>>>>>> not in
>>>>>>>>>> use. Maybe I am missing something as you suggested here, but looks 
>>>>>>>>>> to me
>>>>>>>>>> using weak references might not help in this case.
>>>>>>>>>>
>>>>>>>>>> I understand your concern, and I think you might misunderstood
>>>>>>>>>> what I meant. I am totally for working hard for best user 
>>>>>>>>>> experience, and I
>>>>>>>>>> think the current API provides a good example of that. That's also 
>>>>>>>>>> the
>>>>>>>>>> reason I am implementing a runner here. I am just proposing an extra 
>>>>>>>>>> API to
>>>>>>>>>> expose an iterator that can be closed when not needed, that way the 
>>>>>>>>>> users
>>>>>>>>>> can use this feature to iterate through large state that doesn't fit 
>>>>>>>>>> into
>>>>>>>>>> memory. I believe this is also a pretty general use case and it's 
>>>>>>>>>> better to
>>>>>>>>>> have support for it. I am actually arguing this will be a better user
>>>>>>>>>> experience to add this extra API since more users can benefit from 
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I don't agree. I believe you can track the iterators/iterables
>>>>>>>>>>> that are created and freed by using weak references and reference 
>>>>>>>>>>> queues
>>>>>>>>>>> (or other methods). Having a few people work 10x as hard to provide 
>>>>>>>>>>> a good
>>>>>>>>>>> implementation is much better then having 100s or 1000s of users 
>>>>>>>>>>> suffering
>>>>>>>>>>> through a more complicated API.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory footprint,
>>>>>>>>>>>> but we still won't be able to release the underlying resources. We 
>>>>>>>>>>>> can add
>>>>>>>>>>>> definitely heuristics to help release the resources as you 
>>>>>>>>>>>> mentioned, but
>>>>>>>>>>>> there is no accurate way to track all the iterators/iterables 
>>>>>>>>>>>> created and
>>>>>>>>>>>> free them up once not needed. I think while the API is aimed at 
>>>>>>>>>>>> nice user
>>>>>>>>>>>> experience, we should have the option to let users optimize their
>>>>>>>>>>>> performance if they choose to. Do you agree?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Users won't reliably close/release the resources and forcing
>>>>>>>>>>>>> them to will make the user experience worse.
>>>>>>>>>>>>> It will make a lot more sense to use a file format which
>>>>>>>>>>>>> allows random access and use a cache to load/evict blocks of the 
>>>>>>>>>>>>> state from
>>>>>>>>>>>>> memory.
>>>>>>>>>>>>> If that is not possible, use an iterable which frees the
>>>>>>>>>>>>> resource after a certain amount of inactivity or uses weak 
>>>>>>>>>>>>> references.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and SetState
>>>>>>>>>>>>>> in our Samza runner. We noticed that the state returns the Java 
>>>>>>>>>>>>>> Iterable
>>>>>>>>>>>>>> for reading entries, keys, etc. For state backed by file-based 
>>>>>>>>>>>>>> kv store
>>>>>>>>>>>>>> like rocksDb, we need to be able to let users explicitly close
>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we have to 
>>>>>>>>>>>>>> load the
>>>>>>>>>>>>>> iterable into memory so we can safely close the underlying 
>>>>>>>>>>>>>> rocksDb
>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But this won't
>>>>>>>>>>>>>> work for states that don't fit into memory. I chatted with Kenn 
>>>>>>>>>>>>>> and he also
>>>>>>>>>>>>>> agrees we need this capability to avoid bulk read/write. This
>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we can add 
>>>>>>>>>>>>>> the support
>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any feedback 
>>>>>>>>>>>>>> is highly
>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>
>>>

Reply via email to