Hi all, I've been exploring a few different options for storing tenant-specific configurations within Flink state based on the messages I have flowing through my job. Initially I had considered creating a source that would periodically poll an HTTP API and connect that stream to my original event stream.
However, I realized that this configuration information would basically never change and thus it doesn't quite make sense to poll so frequently. My next approach would be to have a function that would be keyed (by tenant) and storing the configuration for that tenant in state (and issue an HTTP call when I did not have it). Something like this: class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, JsonObject>() { // Tenant specific configuration private lateinit var httpClient: HttpClient private lateinit var configuration: ValueState<String> override fun open(parameters: Configuration) { super.open(parameters) httpClient = HttpClient.newHttpClient() } override fun processElement(message: JsonObject, context: Context, out: Collector<JsonObject>) { if (configuration.value() == null){ // Issue a request to the appropriate API to load the configuration val url = HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build() httpClient.send(..., { // Store the configuration info within state here configuration.update(...) }) out.collect(message) } else { // Get the configuration information and pass it downstream to be used by the sink out.collect(message) } } } I didn't see any support for using the Async I/O functions from a keyed context, otherwise I'd imagine that would be ideal. The requests themselves should be very infrequent (initial call per tenant) and I'd imagine after that the necessary configuration could be pulled/stored within the state for that key. Is there a good way of handling this that I might be overlooking with an existing Flink construct or function? I'd love to be able to leverage the Async I/O connectors as they seem pretty well thought out. Thanks in advance! Rion