Hi, if you need range queries for the lookups, you can only use Option A (async calls to an external store). Queryable State only supports key lookups but no range queries.
Since version 1.2.0, Flink has a dedicated function type for async calls [1]. This might be helpful to implement your usecase. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html 2017-05-19 19:39 GMT+02:00 Sand Stone <sand.m.st...@gmail.com>: > Also, took a quick read on side input. it's unclear to me how side > input could solve this issue better. > > At a high level, this is what I have in mind: > flatmap(byte[] value, Collector<> output) { > var iter = someStoreStateObject.seek(akeyprefix); > //or seek(akeyprefix, akeysuffix); > for(byte[] key : iter) {} > } > > Thanks for your time! > > > On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sand.m.st...@gmail.com> > wrote: > > Thanks Gordon and Fabian. > > > > The enriching data is really reference data, e.g. the reverseIP > > database. It's hard to be keyed as the main data stream as the "ip > > address" in the event is not a primary key in the main data stream. > > > > QueryableState is close, but it does not support range scan as far as > > I could tell. The remote datastore has a clean semantics: a logical > > single copy plus supports range scan, but the RPC to another cluster > > is not optimal. > > > > I assume this is a quite common streaming processing pattern for Flink > > based services. > > > > > > On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com> > wrote: > >> +1 to what Gordon said. > >> > >> Queryable state is rather meant as an external interface to streaming > jobs > >> than for lookups within jobs. > >> Accessing co-located state should give you better performance and is > >> probably easier to implement and maintain. > >> > >> Cheers, > >> Fabian > >> > >> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: > >>> > >>> Hi, > >>> > >>> Can the enriching data be keyed? Or is it something that has to be > >>> broadcasted to each operator? > >>> Either way, I think Side Inputs (an upcoming feature in the future) is > the > >>> best fit for this. You can take a look at > >>> https://issues.apache.org/jira/browse/FLINK-6131. > >>> > >>> Regarding the 3 options you listed: > >>> > >>> By using QueryableState in option B, what you mean is that you want to > >>> feed the enriching data stream to a separate job, let that job allow > >>> queryable state, and query that state from the actual application job > >>> operators, correct? If so, I think options A and B would mean the same > >>> thing; i.e., they require accessing data external to the job. > >>> > >>> If the enriching data can somehow be keyed with the stream that > requires > >>> it, I would go for option C using connected streams, with the > enriching data > >>> as one input and the actual data as the other. Instead of just > “caching the > >>> enriching data in memory”, you should register it as a managed Link > state > >>> for the CoMapFunction / CoFlatMapFunction. The actual input stream > records > >>> can just access that registered state locally. > >>> > >>> Cheers, > >>> Gordon > >>> > >>> > >>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) > wrote: > >>> > >>> Hi. Say I have a few reference data sets need to be used for a > >>> streaming job. The sizes range between 10M-10GB. The data is not > >>> static, will be refreshed at minutes and/or day intervals. > >>> > >>> With the new advancements in Flink, it seems there are quite a few > >>> options. > >>> A. Store all the data in an external (kv) database cluster. And use > >>> async io calls > >>> * data refresh can be done in a few different ways > >>> B. Use the new Querytable State feature > >>> * it seems there is no "easy" API to discover the > >>> queryable state at the moment. Need to use the restful API to figure > >>> out the job id. > >>> C. Ingest the reference data into the job and cache them in memory > >>> Any other option? > >>> > >>> On paper, it seems option B with the Queryable State is the cleanest > >>> solution. > >>> > >>> Any comment/suggestion is greatly appreciated in particular in terms > >>> of robustness and consistent recovery. > >>> > >>> Thanks much! > >> > >> >