Hi Caizhi, I don’t mind the request being synchronous (or not using the Async I/O connectors). Assuming I go down that route would this be the appropriate way to handle this? Specifically creating an HttpClient and storing the result in state and on a keyed stream if the state was empty?
It makes sense to me, just wondering if there are any gotchas or recommendations in terms of a client that might support things like retries and if this a good pattern to accomplish this. Thanks, Rion > On Aug 16, 2021, at 11:57 PM, Caizhi Weng <tsreape...@gmail.com> wrote: > > > Hi! > > As you mentioned that the configuration fetching is very infrequent, why > don't you use a blocking approach to send HTTP requests and receive > responses? This seems like a more reasonable solution to me. > > Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 上午4:00写道: >> Hi all, >> >> I've been exploring a few different options for storing tenant-specific >> configurations within Flink state based on the messages I have flowing >> through my job. Initially I had considered creating a source that would >> periodically poll an HTTP API and connect that stream to my original event >> stream. >> >> However, I realized that this configuration information would basically >> never change and thus it doesn't quite make sense to poll so frequently. My >> next approach would be to have a function that would be keyed (by tenant) >> and storing the configuration for that tenant in state (and issue an HTTP >> call when I did not have it). Something like this: >> >> class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, >> JsonObject>() { >> // Tenant specific configuration >> private lateinit var httpClient: HttpClient >> private lateinit var configuration: ValueState<String> >> >> override fun open(parameters: Configuration) { >> super.open(parameters) >> httpClient = HttpClient.newHttpClient() >> } >> >> override fun processElement(message: JsonObject, context: Context, out: >> Collector<JsonObject>) { >> if (configuration.value() == null){ >> // Issue a request to the appropriate API to load the >> configuration >> val url = >> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build() >> httpClient.send(..., { >> // Store the configuration info within state here >> configuration.update(...) >> }) >> >> out.collect(message) >> } >> else { >> // Get the configuration information and pass it downstream to >> be used by the sink >> out.collect(message) >> } >> } >> } >> I didn't see any support for using the Async I/O functions from a keyed >> context, otherwise I'd imagine that would be ideal. The requests themselves >> should be very infrequent (initial call per tenant) and I'd imagine after >> that the necessary configuration could be pulled/stored within the state for >> that key. >> >> Is there a good way of handling this that I might be overlooking with an >> existing Flink construct or function? I'd love to be able to leverage the >> Async I/O connectors as they seem pretty well thought out. >> >> Thanks in advance! >> >> Rion >> >>