Hi Caizhi,

I don’t mind the request being synchronous (or not using the Async I/O 
connectors). Assuming I go down that route would this be the appropriate way to 
handle this? Specifically creating an HttpClient and storing the result in 
state and on a keyed stream if the state was empty?

It makes sense to me, just wondering if there are any gotchas or 
recommendations in terms of a client that might support things like retries and 
if this a good pattern to accomplish this.

Thanks,

Rion

> On Aug 16, 2021, at 11:57 PM, Caizhi Weng <tsreape...@gmail.com> wrote:
> 
> 
> Hi!
> 
> As you mentioned that the configuration fetching is very infrequent, why 
> don't you use a blocking approach to send HTTP requests and receive 
> responses? This seems like a more reasonable solution to me.
> 
> Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 上午4:00写道:
>> Hi all,
>> 
>> I've been exploring a few different options for storing tenant-specific 
>> configurations within Flink state based on the messages I have flowing 
>> through my job. Initially I had considered creating a source that would 
>> periodically poll an HTTP API and connect that stream to my original event 
>> stream.
>> 
>> However, I realized that this configuration information would basically 
>> never change and thus it doesn't quite make sense to poll so frequently. My 
>> next approach would be to have a function that would be keyed (by tenant) 
>> and storing the configuration for that tenant in state (and issue an HTTP 
>> call when I did not have it). Something like this:
>> 
>> class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, 
>> JsonObject>() {
>>     // Tenant specific configuration
>>     private lateinit var httpClient: HttpClient
>>     private lateinit var configuration: ValueState<String>
>> 
>>     override fun open(parameters: Configuration) {
>>         super.open(parameters)
>>         httpClient = HttpClient.newHttpClient()
>>     }
>> 
>>     override fun processElement(message: JsonObject, context: Context, out: 
>> Collector<JsonObject>) {
>>         if (configuration.value() == null){
>>             // Issue a request to the appropriate API to load the 
>> configuration
>>             val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>>             httpClient.send(..., {
>>                 // Store the configuration info within state here
>>                 configuration.update(...)
>>             })
>>             
>>             out.collect(message)
>>         }
>>         else {
>>             // Get the configuration information and pass it downstream to 
>> be used by the sink
>>             out.collect(message)
>>         }
>>     }
>> }
>> I didn't see any support for using the Async I/O functions from a keyed 
>> context, otherwise I'd imagine that would be ideal. The requests themselves 
>> should be very infrequent (initial call per tenant) and I'd imagine after 
>> that the necessary configuration could be pulled/stored within the state for 
>> that key.
>> 
>> Is there a good way of handling this that I might be overlooking with an 
>> existing Flink construct or function? I'd love to be able to leverage the 
>> Async I/O connectors as they seem pretty well thought out.
>> 
>> Thanks in advance!
>> 
>> Rion
>> 
>> 

Reply via email to