Hi! As you mentioned that the configuration fetching is very infrequent, why don't you use a blocking approach to send HTTP requests and receive responses? This seems like a more reasonable solution to me.
Rion Williams <rionmons...@gmail.com> 于2021年8月17日周二 上午4:00写道: > Hi all, > > I've been exploring a few different options for storing tenant-specific > configurations within Flink state based on the messages I have flowing > through my job. Initially I had considered creating a source that would > periodically poll an HTTP API and connect that stream to my original event > stream. > > However, I realized that this configuration information would basically > never change and thus it doesn't quite make sense to poll so frequently. My > next approach would be to have a function that would be keyed (by tenant) > and storing the configuration for that tenant in state (and issue an HTTP > call when I did not have it). Something like this: > > class ConfigurationLookupFunction: KeyedProcessFunction<String, JsonObject, > JsonObject>() { > // Tenant specific configuration > private lateinit var httpClient: HttpClient > private lateinit var configuration: ValueState<String> > > override fun open(parameters: Configuration) { > super.open(parameters) > httpClient = HttpClient.newHttpClient() > } > > override fun processElement(message: JsonObject, context: Context, out: > Collector<JsonObject>) { > if (configuration.value() == null){ > // Issue a request to the appropriate API to load the > configuration > val url = > HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build() > httpClient.send(..., { > // Store the configuration info within state here > configuration.update(...) > }) > > out.collect(message) > } > else { > // Get the configuration information and pass it downstream to be > used by the sink > out.collect(message) > } > } > } > > I didn't see any support for using the Async I/O functions from a keyed > context, otherwise I'd imagine that would be ideal. The requests themselves > should be very infrequent (initial call per tenant) and I'd imagine after > that the necessary configuration could be pulled/stored within the state > for that key. > > Is there a good way of handling this that I might be overlooking with an > existing Flink construct or function? I'd love to be able to leverage the > Async I/O connectors as they seem pretty well thought out. > > Thanks in advance! > > Rion > > >