Your correct in saying that StatefulDoFn is pointless if you only see every key+window once. The users description wasn't exactly clear but it seemed to me they were reading from a stream and wanted to store all old values that they had seen implying they see keys more then once. The user would need to ensure that the windowing strategy triggers more then once for my suggestion to be useful (e.g. global window with after element count trigger) but without further details my suggestion is a guess.
Also. the implementation for state storage is Runner dependent but I am aware of users storing very large amounts (>> 1 TiB) within state on Dataflow and in general scales very well with the number of keys and windows. On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> wrote: > We have a similar use case, except with BigtableIO instead of HBase. > > We ended up building a custom transform that was basically > PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows > from Bigtable based on the input, however it's tricky to get right because > of batching, etc. > > I'm curious how a StatefulDoFn would help here, it seems like it's more of > just a cache than an actual join (and in my use-case we're never reading a > key more than once so a cache wouldn't help here anyways). Also I'd be > interested to see how the state storage performs with "large" amounts of > state. We're reading ~1 TB of data from Bigtable in a run, and it doesn't > seem reasonable to store that all in a DoFn's state. > > > > On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote: > >> What about a StatefulDoFn where you append the value(s) in a bag state as >> you see them? >> >> If you need to seed the state information, you could do a one time lookup >> in processElement for each key to HBase if the key hasn't yet been seen >> (storing the fact that you loaded the data in a boolean) but afterwards you >> would rely on reading the value(s) from the bag state. >> >> processElement(...) { >> Value newValue = ... >> Iterable<Value> values; >> if (!hasSeenKeyBooleanValueState.read()) { >> values = ... load from HBase ... >> valuesBagState.append(values); >> hasSeenKeyBooleanValueState.set(true); >> } else { >> values = valuesBagState.read(); >> } >> ... perform processing using values ... >> >> valuesBagState.append(newValue); >> } >> >> This blog post[1] has a good example. >> >> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html >> >> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <[email protected]> >> wrote: >> >>> Hello All, >>> I have a use case where I have PCollection<KV<Key,Value>> data coming >>> from Kafka source. When processing each record (KV<Key,Value>) I need all >>> old values for that Key stored in a hbase table. The naive approach is to >>> do HBase lookup in the DoFn.processElement. I considered sideinput but it' >>> not going to work because of large dataset. Any suggestion? >>> >>> Thanks, >>> Chandan >>> >>
