Also, took a quick read on side input. it's unclear to me how side input could solve this issue better.
At a high level, this is what I have in mind: flatmap(byte[] value, Collector<> output) { var iter = someStoreStateObject.seek(akeyprefix); //or seek(akeyprefix, akeysuffix); for(byte[] key : iter) {} } Thanks for your time! On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sand.m.st...@gmail.com> wrote: > Thanks Gordon and Fabian. > > The enriching data is really reference data, e.g. the reverseIP > database. It's hard to be keyed as the main data stream as the "ip > address" in the event is not a primary key in the main data stream. > > QueryableState is close, but it does not support range scan as far as > I could tell. The remote datastore has a clean semantics: a logical > single copy plus supports range scan, but the RPC to another cluster > is not optimal. > > I assume this is a quite common streaming processing pattern for Flink > based services. > > > On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> +1 to what Gordon said. >> >> Queryable state is rather meant as an external interface to streaming jobs >> than for lookups within jobs. >> Accessing co-located state should give you better performance and is >> probably easier to implement and maintain. >> >> Cheers, >> Fabian >> >> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: >>> >>> Hi, >>> >>> Can the enriching data be keyed? Or is it something that has to be >>> broadcasted to each operator? >>> Either way, I think Side Inputs (an upcoming feature in the future) is the >>> best fit for this. You can take a look at >>> https://issues.apache.org/jira/browse/FLINK-6131. >>> >>> Regarding the 3 options you listed: >>> >>> By using QueryableState in option B, what you mean is that you want to >>> feed the enriching data stream to a separate job, let that job allow >>> queryable state, and query that state from the actual application job >>> operators, correct? If so, I think options A and B would mean the same >>> thing; i.e., they require accessing data external to the job. >>> >>> If the enriching data can somehow be keyed with the stream that requires >>> it, I would go for option C using connected streams, with the enriching data >>> as one input and the actual data as the other. Instead of just “caching the >>> enriching data in memory”, you should register it as a managed Link state >>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records >>> can just access that registered state locally. >>> >>> Cheers, >>> Gordon >>> >>> >>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote: >>> >>> Hi. Say I have a few reference data sets need to be used for a >>> streaming job. The sizes range between 10M-10GB. The data is not >>> static, will be refreshed at minutes and/or day intervals. >>> >>> With the new advancements in Flink, it seems there are quite a few >>> options. >>> A. Store all the data in an external (kv) database cluster. And use >>> async io calls >>> * data refresh can be done in a few different ways >>> B. Use the new Querytable State feature >>> * it seems there is no "easy" API to discover the >>> queryable state at the moment. Need to use the restful API to figure >>> out the job id. >>> C. Ingest the reference data into the job and cache them in memory >>> Any other option? >>> >>> On paper, it seems option B with the Queryable State is the cleanest >>> solution. >>> >>> Any comment/suggestion is greatly appreciated in particular in terms >>> of robustness and consistent recovery. >>> >>> Thanks much! >> >>