Just chiming in on this again. I think I have the pieces in place regarding the implementation (both a DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added to the elasticsearch-base module. I noticed that HttpHost wasn't available within that module/in the tests, so I'd suspect that I'd need to add a dependency similar to those found within the specific ES implementations (5/6/7). I'd also assume that it may be best to just provide a dummy sink similar to the other patterning to handle writing the unit tests or would you recommend separate Elasticsearch integration tests using a TestContainer of each supported version (5/6/7) similar to those within the ElasticsearchSinkITCase files under each module?
Any advice / recommendations on this front would be helpful. I want to write some tests surrounding this that demonstrate the most common use-cases, but also don't want to go overkill. Thanks again for all of your help, Rion On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com> wrote: > Thanks again David, > > I've spun up a JIRA issue for the ticket > <https://issues.apache.org/jira/browse/FLINK-23977> while I work on > getting things into the proper state. If someone with the > appropriate privileges could assign it to me, I'd be appreciative. I'll > likely need some assistance at a few points to ensure things look as > expected, but I'm happy to help with this contribution. > > Rion > > On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote: > >> AFAIK there are currently no other sources in Flink that can treat "other >> sources" / "destination" as data. Most complete generic work on this topic >> that I'm aware of are Splittable DoFn based IOs in Apache Beam. >> >> I think the best module for the contribution would be >> "elasticsearch-base", because this could be easily reused for all ES >> versions that we currently support. >> >> Best, >> D. >> >> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com> >> wrote: >> >>> Hi David, >>> >>> That was perfect and it looks like this is working as I'd expected. I >>> put together some larger integration tests for my specific use-case >>> (multiple Elasticsearch clusters running in TestContainers) and verified >>> that messages were being routed dynamically to the appropriate sinks. I >>> forked the Flink repo last night and was trying to figure out the best >>> place to start adding these classes in (I noticed that there were three >>> separate ES packages targeting 5/6/7 respectively). I was going to try to >>> start fleshing the initial implementation for this, but wanted to make sure >>> that I was starting in the right place. >>> >>> Additionally, do you know of anything that might be similar to this even >>> within other sinks? Just trying to think of something to model this after. >>> Once I get things started, I'll spin up a JIRA issue for it and go from >>> there. >>> >>> Thanks so much for your help! >>> >>> Rion >>> >>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote: >>> >>>> Hi Rion, >>>> >>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())* >>>> before opening the child sink. Please see *AbstractRichFunction* [1] >>>> (that EleasticsearchSink extends) for more details. >>>> >>>> One more note, instead of starting with integration test, I'd recommend >>>> writing a unit test using *operator test harness* [2] first. This >>>> should help you to discover / debug many issues upfront. You can use >>>> *ElasticsearchSinkBaseTest* [3] as an example. >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52 >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators >>>> [3] >>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java >>>> >>>> Best, >>>> D. >>>> >>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com> >>>> wrote: >>>> >>>>> Hi David, >>>>> >>>>> Thanks again for the response, I believe that I'm getting pretty close >>>>> for at least a POC-level implementation of this. Currently, I'm working >>>>> with JsonObject instances throughout the pipeline, so I wanted to try this >>>>> out and simply stored the routing information within the element itself >>>>> for >>>>> simplicity's sake right now, so it has a shape that looks something like >>>>> this: >>>>> >>>>> { >>>>> "route": { >>>>> "hosts": "...", >>>>> "index": "...", >>>>> ... >>>>> }, >>>>> "all-other-fields-here" >>>>> } >>>>> >>>>> And I've stripped back several of the layers of the routers (since I >>>>> already have all of the information in the element at that point). I tried >>>>> using something like this: >>>>> >>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), >>>>> CheckpointedFunction { >>>>> private val sinkRoutes: MutableMap<String, >>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap() >>>>> private lateinit var configuration: Configuration >>>>> >>>>> override fun open(parameters: Configuration) { >>>>> configuration = parameters >>>>> } >>>>> >>>>> override fun invoke(element: JsonObject, context: >>>>> SinkFunction.Context) { >>>>> val route = getHost(element) >>>>> // Check if we already have a router for this cluster >>>>> var sink = sinkRoutes[route] >>>>> if (sink == null) { >>>>> // If not, create one >>>>> sink = buildSinkFromRoute(element) >>>>> sink.open(configuration) >>>>> sinkRoutes[route] = sink >>>>> } >>>>> >>>>> sink.invoke(element, context) >>>>> } >>>>> >>>>> override fun initializeState(context: FunctionInitializationContext) { >>>>> // No-op. >>>>> } >>>>> >>>>> override fun snapshotState(context: FunctionSnapshotContext) { >>>>> // This is used only to flush pending writes. >>>>> for (sink in sinkRoutes.values) { >>>>> sink.snapshotState(context) >>>>> } >>>>> } >>>>> >>>>> override fun close() { >>>>> for (sink in sinkRoutes.values) { >>>>> sink.close() >>>>> } >>>>> } >>>>> >>>>> private fun buildSinkFromRoute(element: JsonObject, ho): >>>>> ElasticsearchSink<JsonObject> { >>>>> val builder = ElasticsearchSink.Builder<JsonObject>( >>>>> buildHostsFromElement(element), >>>>> ElasticsearchRoutingFunction() >>>>> ) >>>>> >>>>> builder.setBulkFlushMaxActions(1) >>>>> >>>>> // TODO: Configure authorization if available >>>>> // builder.setRestClientFactory { restClient -> >>>>> // restClient.setHttpClientConfigCallback(object : >>>>> RestClientBuilder.HttpClientConfigCallback { >>>>> // override fun customizeHttpClient(builder: >>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder { >>>>> // // Configure authorization here >>>>> // val credentialsProvider = >>>>> BasicCredentialsProvider().apply { >>>>> // setCredentials( >>>>> // AuthScope.ANY, >>>>> // UsernamePasswordCredentials("$USERNAME", >>>>> "$PASSWORD") >>>>> // ) >>>>> // } >>>>> // >>>>> // return >>>>> builder.setDefaultCredentialsProvider(credentialsProvider); >>>>> // } >>>>> // }) >>>>> // } >>>>> >>>>> return builder.build() >>>>> } >>>>> >>>>> private fun buildHostsFromElement(element: JsonObject): >>>>> List<HttpHost>{ >>>>> val transportAddresses = element >>>>> .get("route").asJsonObject >>>>> .get("hosts").asString >>>>> >>>>> // If there are multiple, they should be comma-delimited >>>>> val addresses = transportAddresses.split(",") >>>>> return addresses >>>>> .filter { address -> address.isNotEmpty() } >>>>> .map { address -> >>>>> HttpHost.create(address) >>>>> } >>>>> } >>>>> >>>>> private fun getHost(element: JsonObject): String { >>>>> return element >>>>> .get("route").asJsonObject >>>>> .get("hosts").asString >>>>> } >>>>> >>>>> private class ElasticsearchRoutingFunction: >>>>> ElasticsearchSinkFunction<JsonObject> { >>>>> override fun process(element: JsonObject, context: >>>>> RuntimeContext, indexer: RequestIndexer) { >>>>> indexer.add(request(element)) >>>>> } >>>>> >>>>> private fun request(element: JsonObject): IndexRequest { >>>>> // Access routing information >>>>> val index = element >>>>> .get("route").asJsonObject >>>>> .get("index").asString >>>>> >>>>> // Strip off routing information >>>>> element.remove("route") >>>>> >>>>> // Send the request >>>>> return Requests.indexRequest() >>>>> .index(index) >>>>> .type("_doc") >>>>> .source(mapOf( >>>>> "data" to "$element" >>>>> )) >>>>> } >>>>> } >>>>> } >>>>> >>>>> After running an integration test, I keep encountering running into >>>>> the following error during the invocation of the child sink: >>>>> >>>>> // The runtime context has not been initialized. >>>>> sink.invoke(element, context) >>>>> >>>>> I can see the underlying sink getting initialized, the open call being >>>>> made, etc. however for some reason it looks like there's an issue related >>>>> to the context during the invoke call namely* "The runtime context >>>>> has not been initialized". *I had assumed this would be alright since >>>>> the context for the "wrapper" should have already been initialized, but >>>>> maybe there's something that I'm missing. >>>>> >>>>> Also, please forgive any hastily written or nasty code as this is >>>>> purely a POC to see if I could get this to work using a single object. I >>>>> have the hopes of cleaning it up and genericizing it after I am confident >>>>> that it actually works. >>>>> >>>>> Thanks so much again, >>>>> >>>>> Rion >>>>> >>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Rion, >>>>>> >>>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid >>>>>> for the reminder <3. >>>>>> >>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass >>>>>>> those elements to the sink, which would create the tenant-specific >>>>>>> Elastic >>>>>>> connection from the ConfigurationT element and handle caching it >>>>>>> and then just grab the element and send it on it's way? >>>>>> >>>>>> >>>>>> Yes, this is exactly what I had in mind. There should be almost no >>>>>> overhead as sink can be easily chained with your join >>>>>> (KeyedCoProcessFunction) function. >>>>>> >>>>>> - >>>>>> - >>>>>>> >>>>>>> The shape of the elements being evicted from the process >>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>> enough >>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>> function? >>>>>>> Etc.) >>>>>> >>>>>> - >>>>>> - To write an element you need a configuration for the >>>>>> destination and the element itself, so a tuple of >>>>>> *(ElasticConfiguration, >>>>>> Element)* should be enough (that's basically your >>>>>> MessageWrapper<ElementT, >>>>>> ConfigurationT> class). >>>>>> >>>>>> >>>>>> - >>>>>> - >>>>>>> >>>>>>> The actual use of the *DynamicElasticsearchSink* class (Would it >>>>>>> just be something like an >>>>>>> *.addSink(**DynamicElasticSearchSink<**String, >>>>>>> Configuration>())* or perhaps something else entirely?) >>>>>> >>>>>> - >>>>>> >>>>>> I guess it could look something like the snippet below. It would be >>>>>> definitely good to play around with the *DynamicElasticSearchSink* >>>>>> API and make it more meaningful / user friendly (the gist I've shared was >>>>>> just a very rough prototype to showcase the idea). >>>>>> >>>>>> >>>>>> - static class Destination { >>>>>> >>>>>> private final List<HttpHost> httpHosts; >>>>>> >>>>>> Destination(List<HttpHost> httpHosts) { >>>>>> this.httpHosts = httpHosts; >>>>>> } >>>>>> } >>>>>> - >>>>>> - final DataStream<Tuple2<Destination, String>> toWrite = ...; >>>>>> toWrite.addSink( >>>>>> new DynamicElasticsearchSink<>( >>>>>> new SinkRouter< >>>>>> Tuple2<Destination, String>, >>>>>> String, >>>>>> ElasticsearchSink<Tuple2<Destination, >>>>>> String>>>() { >>>>>> >>>>>> @Override >>>>>> public String getRoute(Tuple2<Destination, >>>>>> String> element) { >>>>>> - // Construct a deterministic unique >>>>>> caching key for the destination... (this could be cheaper if you know >>>>>> the >>>>>> data) >>>>>> return element.f0.httpHosts.stream() >>>>>> .map(HttpHost::toHostString) >>>>>> .collect(Collectors.joining(",")); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public ElasticsearchSink<Tuple2<Destination, >>>>>> String>> createSink( >>>>>> String cacheKey, Tuple2<Destination, >>>>>> String> element) { >>>>>> return new ElasticsearchSink.Builder<>( >>>>>> element.f0.httpHosts, >>>>>> >>>>>> (ElasticsearchSinkFunction< >>>>>> >>>>>> Tuple2<Destination, String>>) >>>>>> (el, ctx, >>>>>> indexer) -> { >>>>>> // Construct >>>>>> index request. >>>>>> final >>>>>> IndexRequest request = ...; >>>>>> >>>>>> indexer.add(request); >>>>>> }) >>>>>> .build(); >>>>>> } >>>>>> })); >>>>>> >>>>>> >>>>>> I hope this helps ;) >>>>>> >>>>>> Best, >>>>>> D. >>>>>> >>>>>> >>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for this suggestion David, it's extremely helpful. >>>>>>> >>>>>>> Since this will vary depending on the elements retrieved from a >>>>>>> separate stream, I'm guessing something like the following would be >>>>>>> roughly the avenue to continue down: >>>>>>> >>>>>>> fun main(args: Array<String>) { >>>>>>> val parameters = mergeParametersFromProperties(args) >>>>>>> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >>>>>>> >>>>>>> // Get the stream for tenant-specific Elastic configurations >>>>>>> val connectionStream = stream >>>>>>> .fromSource( >>>>>>> KafkaSource.of(parameters, listOf("elastic-configs")), >>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>> "elastic-configs" >>>>>>> ) >>>>>>> >>>>>>> // Get the stream of incoming messages to be routed to Elastic >>>>>>> stream >>>>>>> .fromSource( >>>>>>> KafkaSource.of(parameters, listOf("messages")), >>>>>>> WatermarkStrategy.noWatermarks(), >>>>>>> "messages" >>>>>>> ) >>>>>>> .keyBy { message -> >>>>>>> // Key by the tenant in the message >>>>>>> message.getTenant() >>>>>>> } >>>>>>> .connect( >>>>>>> // Connect the messages stream with the configurations >>>>>>> connectionStream >>>>>>> ) >>>>>>> .process(object : KeyedCoProcessFunction<String, String, >>>>>>> String, String>() { >>>>>>> // For this key, we need to store all of the previous >>>>>>> messages in state >>>>>>> // in the case where we don't have a given mapping for this >>>>>>> tenant yet >>>>>>> lateinit var messagesAwaitingConfigState: ListState<String> >>>>>>> lateinit var configState: ValueState<String> >>>>>>> >>>>>>> override fun open(parameters: Configuration) { >>>>>>> super.open(parameters) >>>>>>> // Initialize the states >>>>>>> messagesAwaitingConfigState = >>>>>>> runtimeContext.getListState(awaitingStateDesc) >>>>>>> configState = runtimeContext.getState(configStateDesc) >>>>>>> } >>>>>>> >>>>>>> // When an element is received >>>>>>> override fun processElement1(message: String, context: >>>>>>> Context, out: Collector<String>) { >>>>>>> // Check if we have a mapping >>>>>>> if (configState.value() == null){ >>>>>>> // We don't have a mapping for this tenant, store >>>>>>> messages until we get it >>>>>>> messagesAwaitingConfigState.add(message) >>>>>>> } >>>>>>> else { >>>>>>> // Output the record with some indicator of the >>>>>>> route? >>>>>>> out.collect(message) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> override fun processElement2(config: String, context: >>>>>>> Context, out: Collector<String>) { >>>>>>> // If this mapping is for this specific tenant, store >>>>>>> it and flush the pending >>>>>>> // records in state >>>>>>> if (config.getTenant() == context.currentKey){ >>>>>>> configState.update(config) >>>>>>> val messagesToFlush = >>>>>>> messagesAwaitingConfigState.get() >>>>>>> messagesToFlush.forEach { message -> >>>>>>> out.collect(message) >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> // State descriptors >>>>>>> val awaitingStateDesc = ListStateDescriptor( >>>>>>> "messages-awaiting-config", >>>>>>> TypeInformation.of(String::class.java) >>>>>>> ) >>>>>>> >>>>>>> val configStateDesc = ValueStateDescriptor( >>>>>>> "elastic-config", >>>>>>> TypeInformation.of(String::class.java) >>>>>>> ) >>>>>>> }) >>>>>>> >>>>>>> stream.executeAsync("$APPLICATION_NAME-job") >>>>>>> } >>>>>>> >>>>>>> Basically, connect my tenant-specific configuration stream with my >>>>>>> incoming messages (keyed by tenant) and buffer them until I have a >>>>>>> corresponding configuration (to avoid race-conditions). However, I'm >>>>>>> guessing what will happen here is rather than directly outputting the >>>>>>> messages from this process function, I'd construct some type of wrapper >>>>>>> here with the necessary routing/configuration for the message (obtained >>>>>>> via >>>>>>> the configuration stream) along with the element, which might be >>>>>>> something >>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those >>>>>>> elements to the sink, which would create the tenant-specific Elastic >>>>>>> connection from the ConfigurationT element and handle caching it >>>>>>> and then just grab the element and send it on it's way? >>>>>>> >>>>>>> Those are really the only bits I'm stuck on at the moment: >>>>>>> >>>>>>> 1. The shape of the elements being evicted from the process >>>>>>> function (Is a simple wrapper with the configuration for the sink >>>>>>> enough >>>>>>> here? Do I need to explicitly initialize the sink within this >>>>>>> function? >>>>>>> Etc.) >>>>>>> 2. The actual use of the DynamicElasticsearchSink class (Would >>>>>>> it just be something like an >>>>>>> .addSink(DynamicElasticSearchSink<String, >>>>>>> Configuration>()) or perhaps something else entirely?) >>>>>>> >>>>>>> Thanks again so much for the advice thus far David, it's greatly >>>>>>> appreciated. >>>>>>> >>>>>>> Rion >>>>>>> >>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> To give you a better idea, in high-level I think could look >>>>>>>> something like this >>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> >>>>>>>> [1]. >>>>>>>> >>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 >>>>>>>> >>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams < >>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> Thanks for your response! I think there are currently quite a few >>>>>>>>> unknowns in my end in terms of what a production loads look like but I >>>>>>>>> think the number of clusters shouldn’t be too large (and will either >>>>>>>>> rarely >>>>>>>>> change or have new entries come in at runtime, but it needs to support >>>>>>>>> that). >>>>>>>>> >>>>>>>>> I think the dynamic approach might be a good route to explore with >>>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m >>>>>>>>> not >>>>>>>>> sure what the dynamic one would look like at the moment though, >>>>>>>>> perhaps >>>>>>>>> that’s something you’d be able to advise on? >>>>>>>>> >>>>>>>>> Given that all the records are keyed for a given tenant and I >>>>>>>>> would have the mappings stored in state, is it possible that within >>>>>>>>> the >>>>>>>>> open() function for this dynamic route to access the state and >>>>>>>>> initialize >>>>>>>>> the client there? Or maybe there’s some other approach (such as >>>>>>>>> grouping by >>>>>>>>> clusters and dynamically handling indices)? >>>>>>>>> >>>>>>>>> I’d be happy to give a shot at making the appropriate changes to >>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you >>>>>>>>> point me >>>>>>>>> in the right direction, I may be able to help out. >>>>>>>>> >>>>>>>>> Thanks much! >>>>>>>>> >>>>>>>>> Rion >>>>>>>>> >>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> Hi Rion, >>>>>>>>> >>>>>>>>> As you probably already know, for dynamic indices, you can simply >>>>>>>>> implement your own ElasticsearchSinkFunction >>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java> >>>>>>>>> [1], where you can create any request that elastic client supports. >>>>>>>>> >>>>>>>>> The tricky part is how to implement dynamic routing into multiple >>>>>>>>> clusters. >>>>>>>>> - If the elastic clusters are known upfront (before submitting >>>>>>>>> job), you can easily create multiple elastic sinks and prepend them >>>>>>>>> with a >>>>>>>>> simple filter (this is basically what split operator does). >>>>>>>>> - If you discover elastics clusters at runtime, this would require >>>>>>>>> some changes of the current ElasticsearchSink implementation. I think >>>>>>>>> this >>>>>>>>> may be actually as simple as introducing something like >>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed >>>>>>>>> "child" >>>>>>>>> sinks. This would probably require some thoughts about how to manage >>>>>>>>> consumed resources (memory), because number of child sink could be >>>>>>>>> potentially unbounded. This could be of course simplified if >>>>>>>>> underlying >>>>>>>>> elastic client already supports that, which I'm not aware of. If >>>>>>>>> you'd like >>>>>>>>> to take this path, it would definitely be a great contribution to >>>>>>>>> Flink >>>>>>>>> (I'm able to provide some guidance). >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> D. >>>>>>>>> >>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams < >>>>>>>>> rionmons...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi folks, >>>>>>>>>> >>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing >>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to >>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this >>>>>>>>>> in Flink >>>>>>>>>> (or if something downstream might be a better option). >>>>>>>>>> >>>>>>>>>> Basically, I have the following pieces to the puzzle: >>>>>>>>>> >>>>>>>>>> - A stream of tenant-specific events >>>>>>>>>> - An HTTP endpoint containing mappings for tenant-specific >>>>>>>>>> Elastic cluster information (as each tenant has its own specific >>>>>>>>>> Elastic >>>>>>>>>> cluster/index) >>>>>>>>>> >>>>>>>>>> What I’m hoping to accomplish is the following: >>>>>>>>>> >>>>>>>>>> 1. One stream will periodically poll the HTTP endpoint and >>>>>>>>>> store these cluster mappings in state (keyed by tenant with >>>>>>>>>> cluster info as >>>>>>>>>> the value) >>>>>>>>>> 2. The event stream will be keyed by tenant and connected to >>>>>>>>>> the cluster mappings stream. >>>>>>>>>> 3. I’ll need to an Elasticsearch sink that can route the >>>>>>>>>> tenant-specific event data to its corresponding cluster/index >>>>>>>>>> from the >>>>>>>>>> mapping source. >>>>>>>>>> >>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic >>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the >>>>>>>>>> cluster like >>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach >>>>>>>>>> here? >>>>>>>>>> >>>>>>>>>> Any advice would be appreciated. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Rion >>>>>>>>>> >>>>>>>>>