Hi Rion,
Your solution is good.

It seems that you need enrich a stream with data queries from external Http
request. There is another solution for reference, just like the mechanism
of lookup join in Flink SQL.
Lookup Join in Flink SQL supports two modes: Async mode and Sync mode.
For each input data from the original source, it lookup keys from dimension
table.
To avoid frequency external I/O, some dimension sources use Cache in memory.
E.g HBase dimension table source would use LRU Cache in memory, it caches
the value for recently used, if the input data hits the query, it could
avoid external I/O; else an external
    call would be triggered, and the result value would be cached into LRU
Cache.
E.g Hive dimension table source would load all data into Cache in Memory,
the cache would refresh regularly according to the specified interval.

Hope the information is helpful.

Best,
JING ZHANG


Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 下午9:23写道:

> Hi Caizhi,
>
> I don’t mind the request being synchronous (or not using the Async I/O
> connectors). Assuming I go down that route would this be the appropriate
> way to handle this? Specifically creating an HttpClient and storing the
> result in state and on a keyed stream if the state was empty?
>
> It makes sense to me, just wondering if there are any gotchas or
> recommendations in terms of a client that might support things like retries
> and if this a good pattern to accomplish this.
>
> Thanks,
>
> Rion
>
> On Aug 16, 2021, at 11:57 PM, Caizhi Weng <tsreape...@gmail.com> wrote:
>
> 
> Hi!
>
> As you mentioned that the configuration fetching is very infrequent, why
> don't you use a blocking approach to send HTTP requests and receive
> responses? This seems like a more reasonable solution to me.
>
> Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 上午4:00写道:
>
>> Hi all,
>>
>> I've been exploring a few different options for storing tenant-specific
>> configurations within Flink state based on the messages I have flowing
>> through my job. Initially I had considered creating a source that would
>> periodically poll an HTTP API and connect that stream to my original event
>> stream.
>>
>> However, I realized that this configuration information would basically
>> never change and thus it doesn't quite make sense to poll so frequently. My
>> next approach would be to have a function that would be keyed (by tenant)
>> and storing the configuration for that tenant in state (and issue an HTTP
>> call when I did not have it). Something like this:
>>
>> class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, 
>> JsonObject>() {
>>     // Tenant specific configuration
>>     private lateinit var httpClient: HttpClient
>>     private lateinit var configuration: ValueState<String>
>>
>>     override fun open(parameters: Configuration) {
>>         super.open(parameters)
>>         httpClient = HttpClient.newHttpClient()
>>     }
>>
>>     override fun processElement(message: JsonObject, context: Context, out: 
>> Collector<JsonObject>) {
>>         if (configuration.value() == null){
>>             // Issue a request to the appropriate API to load the 
>> configuration
>>             val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>>             httpClient.send(..., {
>>                 // Store the configuration info within state here
>>                 configuration.update(...)
>>             })
>>
>>             out.collect(message)
>>         }
>>         else {
>>             // Get the configuration information and pass it downstream to 
>> be used by the sink
>>             out.collect(message)
>>         }
>>     }
>> }
>>
>> I didn't see any support for using the Async I/O functions from a keyed
>> context, otherwise I'd imagine that would be ideal. The requests themselves
>> should be very infrequent (initial call per tenant) and I'd imagine after
>> that the necessary configuration could be pulled/stored within the state
>> for that key.
>>
>> Is there a good way of handling this that I might be overlooking with an
>> existing Flink construct or function? I'd love to be able to leverage the
>> Async I/O connectors as they seem pretty well thought out.
>>
>> Thanks in advance!
>>
>> Rion
>>
>>
>>

Reply via email to