Hi,

if you need range queries for the lookups, you can only use Option A (async
calls to an external store).
Queryable State only supports key lookups but no range queries.

Since version 1.2.0, Flink has a dedicated function type for async calls
[1].
This might be helpful to implement your usecase.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

2017-05-19 19:39 GMT+02:00 Sand Stone <sand.m.st...@gmail.com>:

> Also, took a quick read on side input. it's unclear to me how side
> input could solve this issue better.
>
> At a high level, this is what I have in mind:
>                 flatmap(byte[] value, Collector<> output) {
>                    var iter = someStoreStateObject.seek(akeyprefix);
> //or seek(akeyprefix, akeysuffix);
>                     for(byte[] key : iter) {}
>                 }
>
> Thanks for your time!
>
>
> On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sand.m.st...@gmail.com>
> wrote:
> > Thanks Gordon and Fabian.
> >
> > The enriching data is really reference data, e.g. the reverseIP
> > database. It's hard to be keyed as the main data stream as the "ip
> > address" in the event is not a primary key in the main data stream.
> >
> > QueryableState is close, but it does not support range scan as far as
> > I could tell. The remote datastore has a clean semantics: a logical
> > single copy plus supports range scan, but the RPC to another cluster
> > is not optimal.
> >
> > I assume this is a quite common streaming processing pattern for Flink
> > based services.
> >
> >
> > On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >> +1 to what Gordon said.
> >>
> >> Queryable state is rather meant as an external interface to streaming
> jobs
> >> than for lookups within jobs.
> >> Accessing co-located state should give you better performance and is
> >> probably easier to implement and maintain.
> >>
> >> Cheers,
> >> Fabian
> >>
> >> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
> >>>
> >>> Hi,
> >>>
> >>> Can the enriching data be keyed? Or is it something that has to be
> >>> broadcasted to each operator?
> >>> Either way, I think Side Inputs (an upcoming feature in the future) is
> the
> >>> best fit for this. You can take a look at
> >>> https://issues.apache.org/jira/browse/FLINK-6131.
> >>>
> >>> Regarding the 3 options you listed:
> >>>
> >>> By using QueryableState in option B, what you mean is that you want to
> >>> feed the enriching data stream to a separate job, let that job allow
> >>> queryable state, and query that state from the actual application job
> >>> operators, correct? If so, I think options A and B would mean the same
> >>> thing; i.e., they require accessing data external to the job.
> >>>
> >>> If the enriching data can somehow be keyed with the stream that
> requires
> >>> it, I would go for option C using connected streams, with the
> enriching data
> >>> as one input and the actual data as the other. Instead of just
> “caching the
> >>> enriching data in memory”, you should register it as a managed Link
> state
> >>> for the CoMapFunction / CoFlatMapFunction. The actual input stream
> records
> >>> can just access that registered state locally.
> >>>
> >>> Cheers,
> >>> Gordon
> >>>
> >>>
> >>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com)
> wrote:
> >>>
> >>> Hi. Say I have a few reference data sets need to be used for a
> >>> streaming job. The sizes range between 10M-10GB. The data is not
> >>> static, will be refreshed at minutes and/or day intervals.
> >>>
> >>> With the new advancements in Flink, it seems there are quite a few
> >>> options.
> >>> A. Store all the data in an external (kv) database cluster. And use
> >>> async io calls
> >>> * data refresh can be done in a few different ways
> >>> B. Use the new Querytable State feature
> >>> * it seems there is no "easy" API to discover the
> >>> queryable state at the moment. Need to use the restful API to figure
> >>> out the job id.
> >>> C. Ingest the reference data into the job and cache them in memory
> >>> Any other option?
> >>>
> >>> On paper, it seems option B with the Queryable State is the cleanest
> >>> solution.
> >>>
> >>> Any comment/suggestion is greatly appreciated in particular in terms
> >>> of robustness and consistent recovery.
> >>>
> >>> Thanks much!
> >>
> >>
>

Reply via email to