Thanks again David,

I've spun up a JIRA issue for the ticket
<https://issues.apache.org/jira/browse/FLINK-23977> while I work on getting
things into the proper state. If someone with the
appropriate privileges could assign it to me, I'd be appreciative. I'll
likely need some assistance at a few points to ensure things look as
expected, but I'm happy to help with this contribution.

Rion

On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote:

> AFAIK there are currently no other sources in Flink that can treat "other
> sources" / "destination" as data. Most complete generic work on this topic
> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>
> I think the best module for the contribution would be
> "elasticsearch-base", because this could be easily reused for all ES
> versions that we currently support.
>
> Best,
> D.
>
> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> That was perfect and it looks like this is working as I'd expected. I put
>> together some larger integration tests for my specific use-case (multiple
>> Elasticsearch clusters running in TestContainers) and verified that
>> messages were being routed dynamically to the appropriate sinks. I forked
>> the Flink repo last night and was trying to figure out the best place to
>> start adding these classes in (I noticed that there were three separate ES
>> packages targeting 5/6/7 respectively). I was going to try to start
>> fleshing the initial implementation for this, but wanted to make sure that
>> I was starting in the right place.
>>
>> Additionally, do you know of anything that might be similar to this even
>> within other sinks? Just trying to think of something to model this after.
>> Once I get things started, I'll spin up a JIRA issue for it and go from
>> there.
>>
>> Thanks so much for your help!
>>
>> Rion
>>
>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote:
>>
>>> Hi Rion,
>>>
>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>> (that EleasticsearchSink extends) for more details.
>>>
>>> One more note, instead of starting with integration test, I'd recommend
>>> writing a unit test using *operator test harness* [2] first. This
>>> should help you to discover / debug many issues upfront. You can use
>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>> for at least a POC-level implementation of this. Currently, I'm working
>>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>>> out and simply stored the routing information within the element itself for
>>>> simplicity's sake right now, so it has a shape that looks something like
>>>> this:
>>>>
>>>> {
>>>>     "route": {
>>>>         "hosts": "...",
>>>>         "index": "...",
>>>>         ...
>>>>     },
>>>>     "all-other-fields-here"
>>>> }
>>>>
>>>> And I've stripped back several of the layers of the routers (since I
>>>> already have all of the information in the element at that point). I tried
>>>> using something like this:
>>>>
>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>>>> CheckpointedFunction {
>>>>     private val sinkRoutes: MutableMap<String, 
>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>     private lateinit var configuration: Configuration
>>>>
>>>>     override fun open(parameters: Configuration) {
>>>>         configuration = parameters
>>>>     }
>>>>
>>>>     override fun invoke(element: JsonObject, context: 
>>>> SinkFunction.Context) {
>>>>         val route = getHost(element)
>>>>         // Check if we already have a router for this cluster
>>>>         var sink = sinkRoutes[route]
>>>>         if (sink == null) {
>>>>             // If not, create one
>>>>             sink = buildSinkFromRoute(element)
>>>>             sink.open(configuration)
>>>>             sinkRoutes[route] = sink
>>>>         }
>>>>
>>>>         sink.invoke(element, context)
>>>>     }
>>>>
>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>         // No-op.
>>>>     }
>>>>
>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>         // This is used only to flush pending writes.
>>>>         for (sink in sinkRoutes.values) {
>>>>             sink.snapshotState(context)
>>>>         }
>>>>     }
>>>>
>>>>     override fun close() {
>>>>         for (sink in sinkRoutes.values) {
>>>>             sink.close()
>>>>         }
>>>>     }
>>>>
>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>>>> ElasticsearchSink<JsonObject> {
>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>             buildHostsFromElement(element),
>>>>             ElasticsearchRoutingFunction()
>>>>         )
>>>>
>>>>         builder.setBulkFlushMaxActions(1)
>>>>
>>>>         // TODO: Configure authorization if available
>>>> //        builder.setRestClientFactory { restClient ->
>>>> //            restClient.setHttpClientConfigCallback(object : 
>>>> RestClientBuilder.HttpClientConfigCallback {
>>>> //                override fun customizeHttpClient(builder: 
>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>> //                    // Configure authorization here
>>>> //                    val credentialsProvider = 
>>>> BasicCredentialsProvider().apply {
>>>> //                        setCredentials(
>>>> //                            AuthScope.ANY,
>>>> //                            UsernamePasswordCredentials("$USERNAME", 
>>>> "$PASSWORD")
>>>> //                        )
>>>> //                    }
>>>> //
>>>> //                    return 
>>>> builder.setDefaultCredentialsProvider(credentialsProvider);
>>>> //                }
>>>> //            })
>>>> //        }
>>>>
>>>>         return builder.build()
>>>>     }
>>>>
>>>>     private fun buildHostsFromElement(element: JsonObject): List<HttpHost>{
>>>>         val transportAddresses = element
>>>>             .get("route").asJsonObject
>>>>             .get("hosts").asString
>>>>
>>>>         // If there are multiple, they should be comma-delimited
>>>>         val addresses = transportAddresses.split(",")
>>>>         return addresses
>>>>             .filter { address -> address.isNotEmpty() }
>>>>             .map { address ->
>>>>                 HttpHost.create(address)
>>>>             }
>>>>     }
>>>>
>>>>     private fun getHost(element: JsonObject): String {
>>>>         return element
>>>>             .get("route").asJsonObject
>>>>             .get("hosts").asString
>>>>     }
>>>>
>>>>     private class ElasticsearchRoutingFunction: 
>>>> ElasticsearchSinkFunction<JsonObject> {
>>>>         override fun process(element: JsonObject, context: RuntimeContext, 
>>>> indexer: RequestIndexer) {
>>>>             indexer.add(request(element))
>>>>         }
>>>>
>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>             // Access routing information
>>>>             val index = element
>>>>                 .get("route").asJsonObject
>>>>                 .get("index").asString
>>>>
>>>>             // Strip off routing information
>>>>             element.remove("route")
>>>>
>>>>             // Send the request
>>>>             return Requests.indexRequest()
>>>>                 .index(index)
>>>>                 .type("_doc")
>>>>                 .source(mapOf(
>>>>                     "data" to "$element"
>>>>                 ))
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> After running an integration test, I keep encountering running into the
>>>> following error during the invocation of the child sink:
>>>>
>>>> // The runtime context has not been initialized.
>>>> sink.invoke(element, context)
>>>>
>>>> I can see the underlying sink getting initialized, the open call being
>>>> made, etc. however for some reason it looks like there's an issue related
>>>> to the context during the invoke call namely* "The runtime context has
>>>> not been initialized". *I had assumed this would be alright since the
>>>> context for the "wrapper" should have already been initialized, but maybe
>>>> there's something that I'm missing.
>>>>
>>>> Also, please forgive any hastily written or nasty code as this is
>>>> purely a POC to see if I could get this to work using a single object. I
>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>> that it actually works.
>>>>
>>>> Thanks so much again,
>>>>
>>>> Rion
>>>>
>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org> wrote:
>>>>
>>>>> Hi Rion,
>>>>>
>>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>>> for the reminder <3.
>>>>>
>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>> those elements to the sink, which would create the tenant-specific 
>>>>>> Elastic
>>>>>> connection from the ConfigurationT element and handle caching it and
>>>>>> then just grab the element and send it on it's way?
>>>>>
>>>>>
>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>> overhead as sink can be easily chained with your join
>>>>> (KeyedCoProcessFunction) function.
>>>>>
>>>>>    -
>>>>>    -
>>>>>>
>>>>>>    The shape of the elements being evicted from the process function
>>>>>>    (Is a simple wrapper with the configuration for the sink enough here? 
>>>>>> Do I
>>>>>>    need to explicitly initialize the sink within this function? Etc.)
>>>>>
>>>>>    -
>>>>>    - To write an element you need a configuration for the destination
>>>>>    and the element itself, so a tuple of *(ElasticConfiguration,
>>>>>    Element)* should be enough (that's basically your 
>>>>> MessageWrapper<ElementT,
>>>>>    ConfigurationT> class).
>>>>>
>>>>>
>>>>>    -
>>>>>    -
>>>>>>
>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>>    just be something like an 
>>>>>> *.addSink(**DynamicElasticSearchSink<**String,
>>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>>
>>>>>    -
>>>>>
>>>>> I guess it could look something like the snippet below. It would be
>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>> just a very rough prototype to showcase the idea).
>>>>>
>>>>>
>>>>>    - static class Destination {
>>>>>
>>>>>        private final List<HttpHost> httpHosts;
>>>>>
>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>            this.httpHosts = httpHosts;
>>>>>        }
>>>>>    }
>>>>>    -
>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>    toWrite.addSink(
>>>>>            new DynamicElasticsearchSink<>(
>>>>>                    new SinkRouter<
>>>>>                            Tuple2<Destination, String>,
>>>>>                            String,
>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>    String>>>() {
>>>>>
>>>>>                        @Override
>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>    String> element) {
>>>>>    -                         // Construct a deterministic unique
>>>>>    caching key for the destination... (this could be cheaper if you know 
>>>>> the
>>>>>    data)
>>>>>                            return element.f0.httpHosts.stream()
>>>>>                                    .map(HttpHost::toHostString)
>>>>>                                    .collect(Collectors.joining(","));
>>>>>                        }
>>>>>
>>>>>                        @Override
>>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>>    String>> createSink(
>>>>>                                String cacheKey, Tuple2<Destination,
>>>>>    String> element) {
>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>                                            element.f0.httpHosts,
>>>>>                                            (ElasticsearchSinkFunction<
>>>>>
>>>>>    Tuple2<Destination, String>>)
>>>>>                                                    (el, ctx, indexer)
>>>>>    -> {
>>>>>                                                        // Construct
>>>>>    index request.
>>>>>                                                        final
>>>>>    IndexRequest request = ...;
>>>>>
>>>>>    indexer.add(request);
>>>>>                                                    })
>>>>>                                    .build();
>>>>>                        }
>>>>>                    }));
>>>>>
>>>>>
>>>>> I hope this helps ;)
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>>
>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>
>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>> separate stream, I'm guessing something like the following would be
>>>>>> roughly the avenue to continue down:
>>>>>>
>>>>>> fun main(args: Array<String>) {
>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>
>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>     val connectionStream = stream
>>>>>>         .fromSource(
>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>             "elastic-configs"
>>>>>>         )
>>>>>>
>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>     stream
>>>>>>         .fromSource(
>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>             "messages"
>>>>>>         )
>>>>>>         .keyBy { message ->
>>>>>>             // Key by the tenant in the message
>>>>>>             message.getTenant()
>>>>>>         }
>>>>>>         .connect(
>>>>>>             // Connect the messages stream with the configurations
>>>>>>             connectionStream
>>>>>>         )
>>>>>>         .process(object : KeyedCoProcessFunction<String, String, String, 
>>>>>> String>() {
>>>>>>             // For this key, we need to store all of the previous 
>>>>>> messages in state
>>>>>>             // in the case where we don't have a given mapping for this 
>>>>>> tenant yet
>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>             lateinit var configState: ValueState<String>
>>>>>>
>>>>>>             override fun open(parameters: Configuration) {
>>>>>>                 super.open(parameters)
>>>>>>                 // Initialize the states
>>>>>>                 messagesAwaitingConfigState = 
>>>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>             }
>>>>>>
>>>>>>             // When an element is received
>>>>>>             override fun processElement1(message: String, context: 
>>>>>> Context, out: Collector<String>) {
>>>>>>                 // Check if we have a mapping
>>>>>>                 if (configState.value() == null){
>>>>>>                     // We don't have a mapping for this tenant, store 
>>>>>> messages until we get it
>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>                 }
>>>>>>                 else {
>>>>>>                     // Output the record with some indicator of the 
>>>>>> route?
>>>>>>                     out.collect(message)
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>             override fun processElement2(config: String, context: 
>>>>>> Context, out: Collector<String>) {
>>>>>>                 // If this mapping is for this specific tenant, store it 
>>>>>> and flush the pending
>>>>>>                 // records in state
>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>                     configState.update(config)
>>>>>>                     val messagesToFlush = 
>>>>>> messagesAwaitingConfigState.get()
>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>                         out.collect(message)
>>>>>>                     }
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>             // State descriptors
>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>                 "messages-awaiting-config",
>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>             )
>>>>>>
>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>                 "elastic-config",
>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>             )
>>>>>>         })
>>>>>>
>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>> }
>>>>>>
>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>> here with the necessary routing/configuration for the message (obtained 
>>>>>> via
>>>>>> the configuration stream) along with the element, which might be 
>>>>>> something
>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>> connection from the ConfigurationT element and handle caching it and
>>>>>> then just grab the element and send it on it's way?
>>>>>>
>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>
>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>> enough
>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>> function?
>>>>>>    Etc.)
>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would it
>>>>>>    just be something like an .addSink(DynamicElasticSearchSink<String,
>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>
>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>> appreciated.
>>>>>>
>>>>>> Rion
>>>>>>
>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>> something like this
>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>>>>>>>
>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <rionmons...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>>> think the number of clusters shouldn’t be too large (and will either 
>>>>>>>> rarely
>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>> that).
>>>>>>>>
>>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m 
>>>>>>>> not
>>>>>>>> sure what the dynamic one would look like at the moment though, perhaps
>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>
>>>>>>>> Given that all the records are keyed for a given tenant and I would
>>>>>>>> have the mappings stored in state, is it possible that within the 
>>>>>>>> open()
>>>>>>>> function for this dynamic route to access the state and initialize the
>>>>>>>> client there? Or maybe there’s some other approach (such as grouping by
>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>
>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you 
>>>>>>>> point me
>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>
>>>>>>>> Thanks much!
>>>>>>>>
>>>>>>>> Rion
>>>>>>>>
>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org> wrote:
>>>>>>>>
>>>>>>>> 
>>>>>>>> Hi Rion,
>>>>>>>>
>>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>
>>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>>> clusters.
>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>> job), you can easily create multiple elastic sinks and prepend them 
>>>>>>>> with a
>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>>> some changes of the current ElasticsearchSink implementation. I think 
>>>>>>>> this
>>>>>>>> may be actually as simple as introducing something like
>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>>>>> "child"
>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>> potentially unbounded. This could be of course simplified if underlying
>>>>>>>> elastic client already supports that, which I'm not aware of. If you'd 
>>>>>>>> like
>>>>>>>> to take this path, it would definitely be a great contribution to Flink
>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> D.
>>>>>>>>
>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <rionmons...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this in 
>>>>>>>>> Flink
>>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>>
>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>
>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>    Elastic cluster information (as each tenant has its own specific 
>>>>>>>>> Elastic
>>>>>>>>>    cluster/index)
>>>>>>>>>
>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>
>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>    store these cluster mappings in state (keyed by tenant with 
>>>>>>>>> cluster info as
>>>>>>>>>    the value)
>>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>>    the cluster mappings stream.
>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>    tenant-specific event data to its corresponding cluster/index from 
>>>>>>>>> the
>>>>>>>>>    mapping source.
>>>>>>>>>
>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the cluster 
>>>>>>>>> like
>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach 
>>>>>>>>> here?
>>>>>>>>>
>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>

Reply via email to