Also, took a quick read on side input. it's unclear to me how side
input could solve this issue better.

At a high level, this is what I have in mind:
                flatmap(byte[] value, Collector<> output) {
                   var iter = someStoreStateObject.seek(akeyprefix);
//or seek(akeyprefix, akeysuffix);
                    for(byte[] key : iter) {}
                }

Thanks for your time!


On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sand.m.st...@gmail.com> wrote:
> Thanks Gordon and Fabian.
>
> The enriching data is really reference data, e.g. the reverseIP
> database. It's hard to be keyed as the main data stream as the "ip
> address" in the event is not a primary key in the main data stream.
>
> QueryableState is close, but it does not support range scan as far as
> I could tell. The remote datastore has a clean semantics: a logical
> single copy plus supports range scan, but the RPC to another cluster
> is not optimal.
>
> I assume this is a quite common streaming processing pattern for Flink
> based services.
>
>
> On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>> +1 to what Gordon said.
>>
>> Queryable state is rather meant as an external interface to streaming jobs
>> than for lookups within jobs.
>> Accessing co-located state should give you better performance and is
>> probably easier to implement and maintain.
>>
>> Cheers,
>> Fabian
>>
>> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
>>>
>>> Hi,
>>>
>>> Can the enriching data be keyed? Or is it something that has to be
>>> broadcasted to each operator?
>>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>>> best fit for this. You can take a look at
>>> https://issues.apache.org/jira/browse/FLINK-6131.
>>>
>>> Regarding the 3 options you listed:
>>>
>>> By using QueryableState in option B, what you mean is that you want to
>>> feed the enriching data stream to a separate job, let that job allow
>>> queryable state, and query that state from the actual application job
>>> operators, correct? If so, I think options A and B would mean the same
>>> thing; i.e., they require accessing data external to the job.
>>>
>>> If the enriching data can somehow be keyed with the stream that requires
>>> it, I would go for option C using connected streams, with the enriching data
>>> as one input and the actual data as the other. Instead of just “caching the
>>> enriching data in memory”, you should register it as a managed Link state
>>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>>> can just access that registered state locally.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>>>
>>> Hi. Say I have a few reference data sets need to be used for a
>>> streaming job. The sizes range between 10M-10GB. The data is not
>>> static, will be refreshed at minutes and/or day intervals.
>>>
>>> With the new advancements in Flink, it seems there are quite a few
>>> options.
>>> A. Store all the data in an external (kv) database cluster. And use
>>> async io calls
>>> * data refresh can be done in a few different ways
>>> B. Use the new Querytable State feature
>>> * it seems there is no "easy" API to discover the
>>> queryable state at the moment. Need to use the restful API to figure
>>> out the job id.
>>> C. Ingest the reference data into the job and cache them in memory
>>> Any other option?
>>>
>>> On paper, it seems option B with the Queryable State is the cleanest
>>> solution.
>>>
>>> Any comment/suggestion is greatly appreciated in particular in terms
>>> of robustness and consistent recovery.
>>>
>>> Thanks much!
>>
>>

Reply via email to