Just chiming in on this again.

I think I have the pieces in place regarding the implementation (both a
DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
to the elasticsearch-base module. I noticed that HttpHost wasn't available
within that module/in the tests, so I'd suspect that I'd need to add a
dependency similar to those found within the specific ES implementations
(5/6/7). I'd also assume that it may be best to just provide a dummy sink
similar to the other patterning to handle writing the unit tests or would
you recommend separate Elasticsearch integration tests using a
TestContainer of each supported version (5/6/7) similar to those within the
ElasticsearchSinkITCase files under each module?

Any advice / recommendations on this front would be helpful. I want to
write some tests surrounding this that demonstrate the most common
use-cases, but also don't want to go overkill.

Thanks again for all of your help,

Rion

On Wed, Aug 25, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com> wrote:

> Thanks again David,
>
> I've spun up a JIRA issue for the ticket
> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
> getting things into the proper state. If someone with the
> appropriate privileges could assign it to me, I'd be appreciative. I'll
> likely need some assistance at a few points to ensure things look as
> expected, but I'm happy to help with this contribution.
>
> Rion
>
> On Wed, Aug 25, 2021 at 11:37 AM David Morávek <d...@apache.org> wrote:
>
>> AFAIK there are currently no other sources in Flink that can treat "other
>> sources" / "destination" as data. Most complete generic work on this topic
>> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>
>> I think the best module for the contribution would be
>> "elasticsearch-base", because this could be easily reused for all ES
>> versions that we currently support.
>>
>> Best,
>> D.
>>
>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams <rionmons...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> That was perfect and it looks like this is working as I'd expected. I
>>> put together some larger integration tests for my specific use-case
>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>> that messages were being routed dynamically to the appropriate sinks. I
>>> forked the Flink repo last night and was trying to figure out the best
>>> place to start adding these classes in (I noticed that there were three
>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>> start fleshing the initial implementation for this, but wanted to make sure
>>> that I was starting in the right place.
>>>
>>> Additionally, do you know of anything that might be similar to this even
>>> within other sinks? Just trying to think of something to model this after.
>>> Once I get things started, I'll spin up a JIRA issue for it and go from
>>> there.
>>>
>>> Thanks so much for your help!
>>>
>>> Rion
>>>
>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>> (that EleasticsearchSink extends) for more details.
>>>>
>>>> One more note, instead of starting with integration test, I'd recommend
>>>> writing a unit test using *operator test harness* [2] first. This
>>>> should help you to discover / debug many issues upfront. You can use
>>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>> [3]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams <rionmons...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>>> for at least a POC-level implementation of this. Currently, I'm working
>>>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>>>> out and simply stored the routing information within the element itself 
>>>>> for
>>>>> simplicity's sake right now, so it has a shape that looks something like
>>>>> this:
>>>>>
>>>>> {
>>>>>     "route": {
>>>>>         "hosts": "...",
>>>>>         "index": "...",
>>>>>         ...
>>>>>     },
>>>>>     "all-other-fields-here"
>>>>> }
>>>>>
>>>>> And I've stripped back several of the layers of the routers (since I
>>>>> already have all of the information in the element at that point). I tried
>>>>> using something like this:
>>>>>
>>>>> class DynamicElasticsearchSink: RichSinkFunction<JsonObject>(), 
>>>>> CheckpointedFunction {
>>>>>     private val sinkRoutes: MutableMap<String, 
>>>>> ElasticsearchSink<JsonObject>> = ConcurrentHashMap()
>>>>>     private lateinit var configuration: Configuration
>>>>>
>>>>>     override fun open(parameters: Configuration) {
>>>>>         configuration = parameters
>>>>>     }
>>>>>
>>>>>     override fun invoke(element: JsonObject, context: 
>>>>> SinkFunction.Context) {
>>>>>         val route = getHost(element)
>>>>>         // Check if we already have a router for this cluster
>>>>>         var sink = sinkRoutes[route]
>>>>>         if (sink == null) {
>>>>>             // If not, create one
>>>>>             sink = buildSinkFromRoute(element)
>>>>>             sink.open(configuration)
>>>>>             sinkRoutes[route] = sink
>>>>>         }
>>>>>
>>>>>         sink.invoke(element, context)
>>>>>     }
>>>>>
>>>>>     override fun initializeState(context: FunctionInitializationContext) {
>>>>>         // No-op.
>>>>>     }
>>>>>
>>>>>     override fun snapshotState(context: FunctionSnapshotContext) {
>>>>>         // This is used only to flush pending writes.
>>>>>         for (sink in sinkRoutes.values) {
>>>>>             sink.snapshotState(context)
>>>>>         }
>>>>>     }
>>>>>
>>>>>     override fun close() {
>>>>>         for (sink in sinkRoutes.values) {
>>>>>             sink.close()
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private fun buildSinkFromRoute(element: JsonObject, ho): 
>>>>> ElasticsearchSink<JsonObject> {
>>>>>         val builder = ElasticsearchSink.Builder<JsonObject>(
>>>>>             buildHostsFromElement(element),
>>>>>             ElasticsearchRoutingFunction()
>>>>>         )
>>>>>
>>>>>         builder.setBulkFlushMaxActions(1)
>>>>>
>>>>>         // TODO: Configure authorization if available
>>>>> //        builder.setRestClientFactory { restClient ->
>>>>> //            restClient.setHttpClientConfigCallback(object : 
>>>>> RestClientBuilder.HttpClientConfigCallback {
>>>>> //                override fun customizeHttpClient(builder: 
>>>>> HttpAsyncClientBuilder): HttpAsyncClientBuilder {
>>>>> //                    // Configure authorization here
>>>>> //                    val credentialsProvider = 
>>>>> BasicCredentialsProvider().apply {
>>>>> //                        setCredentials(
>>>>> //                            AuthScope.ANY,
>>>>> //                            UsernamePasswordCredentials("$USERNAME", 
>>>>> "$PASSWORD")
>>>>> //                        )
>>>>> //                    }
>>>>> //
>>>>> //                    return 
>>>>> builder.setDefaultCredentialsProvider(credentialsProvider);
>>>>> //                }
>>>>> //            })
>>>>> //        }
>>>>>
>>>>>         return builder.build()
>>>>>     }
>>>>>
>>>>>     private fun buildHostsFromElement(element: JsonObject): 
>>>>> List<HttpHost>{
>>>>>         val transportAddresses = element
>>>>>             .get("route").asJsonObject
>>>>>             .get("hosts").asString
>>>>>
>>>>>         // If there are multiple, they should be comma-delimited
>>>>>         val addresses = transportAddresses.split(",")
>>>>>         return addresses
>>>>>             .filter { address -> address.isNotEmpty() }
>>>>>             .map { address ->
>>>>>                 HttpHost.create(address)
>>>>>             }
>>>>>     }
>>>>>
>>>>>     private fun getHost(element: JsonObject): String {
>>>>>         return element
>>>>>             .get("route").asJsonObject
>>>>>             .get("hosts").asString
>>>>>     }
>>>>>
>>>>>     private class ElasticsearchRoutingFunction: 
>>>>> ElasticsearchSinkFunction<JsonObject> {
>>>>>         override fun process(element: JsonObject, context: 
>>>>> RuntimeContext, indexer: RequestIndexer) {
>>>>>             indexer.add(request(element))
>>>>>         }
>>>>>
>>>>>         private fun request(element: JsonObject): IndexRequest {
>>>>>             // Access routing information
>>>>>             val index = element
>>>>>                 .get("route").asJsonObject
>>>>>                 .get("index").asString
>>>>>
>>>>>             // Strip off routing information
>>>>>             element.remove("route")
>>>>>
>>>>>             // Send the request
>>>>>             return Requests.indexRequest()
>>>>>                 .index(index)
>>>>>                 .type("_doc")
>>>>>                 .source(mapOf(
>>>>>                     "data" to "$element"
>>>>>                 ))
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> After running an integration test, I keep encountering running into
>>>>> the following error during the invocation of the child sink:
>>>>>
>>>>> // The runtime context has not been initialized.
>>>>> sink.invoke(element, context)
>>>>>
>>>>> I can see the underlying sink getting initialized, the open call being
>>>>> made, etc. however for some reason it looks like there's an issue related
>>>>> to the context during the invoke call namely* "The runtime context
>>>>> has not been initialized". *I had assumed this would be alright since
>>>>> the context for the "wrapper" should have already been initialized, but
>>>>> maybe there's something that I'm missing.
>>>>>
>>>>> Also, please forgive any hastily written or nasty code as this is
>>>>> purely a POC to see if I could get this to work using a single object. I
>>>>> have the hopes of cleaning it up and genericizing it after I am confident
>>>>> that it actually works.
>>>>>
>>>>> Thanks so much again,
>>>>>
>>>>> Rion
>>>>>
>>>>> On Mon, Aug 23, 2021 at 11:12 AM David Morávek <d...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Rion,
>>>>>>
>>>>>> Sorry for late reply, I've missed your previous message. Thanks Arvid
>>>>>> for the reminder <3.
>>>>>>
>>>>>> something like a MessageWrapper<ElementT, ConfigurationT> and pass
>>>>>>> those elements to the sink, which would create the tenant-specific 
>>>>>>> Elastic
>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>> and then just grab the element and send it on it's way?
>>>>>>
>>>>>>
>>>>>> Yes, this is exactly what I had in mind. There should be almost no
>>>>>> overhead as sink can be easily chained with your join
>>>>>> (KeyedCoProcessFunction) function.
>>>>>>
>>>>>>    -
>>>>>>    -
>>>>>>>
>>>>>>>    The shape of the elements being evicted from the process
>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>> enough
>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>> function?
>>>>>>>    Etc.)
>>>>>>
>>>>>>    -
>>>>>>    - To write an element you need a configuration for the
>>>>>>    destination and the element itself, so a tuple of 
>>>>>> *(ElasticConfiguration,
>>>>>>    Element)* should be enough (that's basically your 
>>>>>> MessageWrapper<ElementT,
>>>>>>    ConfigurationT> class).
>>>>>>
>>>>>>
>>>>>>    -
>>>>>>    -
>>>>>>>
>>>>>>>    The actual use of the *DynamicElasticsearchSink* class (Would it
>>>>>>>    just be something like an 
>>>>>>> *.addSink(**DynamicElasticSearchSink<**String,
>>>>>>>    Configuration>())* or perhaps something else entirely?)
>>>>>>
>>>>>>    -
>>>>>>
>>>>>> I guess it could look something like the snippet below. It would be
>>>>>> definitely good to play around with the *DynamicElasticSearchSink*
>>>>>> API and make it more meaningful / user friendly (the gist I've shared was
>>>>>> just a very rough prototype to showcase the idea).
>>>>>>
>>>>>>
>>>>>>    - static class Destination {
>>>>>>
>>>>>>        private final List<HttpHost> httpHosts;
>>>>>>
>>>>>>        Destination(List<HttpHost> httpHosts) {
>>>>>>            this.httpHosts = httpHosts;
>>>>>>        }
>>>>>>    }
>>>>>>    -
>>>>>>    - final DataStream<Tuple2<Destination, String>> toWrite = ...;
>>>>>>    toWrite.addSink(
>>>>>>            new DynamicElasticsearchSink<>(
>>>>>>                    new SinkRouter<
>>>>>>                            Tuple2<Destination, String>,
>>>>>>                            String,
>>>>>>                            ElasticsearchSink<Tuple2<Destination,
>>>>>>    String>>>() {
>>>>>>
>>>>>>                        @Override
>>>>>>                        public String getRoute(Tuple2<Destination,
>>>>>>    String> element) {
>>>>>>    -                         // Construct a deterministic unique
>>>>>>    caching key for the destination... (this could be cheaper if you know 
>>>>>> the
>>>>>>    data)
>>>>>>                            return element.f0.httpHosts.stream()
>>>>>>                                    .map(HttpHost::toHostString)
>>>>>>                                    .collect(Collectors.joining(","));
>>>>>>                        }
>>>>>>
>>>>>>                        @Override
>>>>>>                        public ElasticsearchSink<Tuple2<Destination,
>>>>>>    String>> createSink(
>>>>>>                                String cacheKey, Tuple2<Destination,
>>>>>>    String> element) {
>>>>>>                            return new ElasticsearchSink.Builder<>(
>>>>>>                                            element.f0.httpHosts,
>>>>>>
>>>>>>    (ElasticsearchSinkFunction<
>>>>>>
>>>>>>    Tuple2<Destination, String>>)
>>>>>>                                                    (el, ctx,
>>>>>>    indexer) -> {
>>>>>>                                                        // Construct
>>>>>>    index request.
>>>>>>                                                        final
>>>>>>    IndexRequest request = ...;
>>>>>>
>>>>>>    indexer.add(request);
>>>>>>                                                    })
>>>>>>                                    .build();
>>>>>>                        }
>>>>>>                    }));
>>>>>>
>>>>>>
>>>>>> I hope this helps ;)
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams <rionmons...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for this suggestion David, it's extremely helpful.
>>>>>>>
>>>>>>> Since this will vary depending on the elements retrieved from a
>>>>>>> separate stream, I'm guessing something like the following would be
>>>>>>> roughly the avenue to continue down:
>>>>>>>
>>>>>>> fun main(args: Array<String>) {
>>>>>>>     val parameters = mergeParametersFromProperties(args)
>>>>>>>     val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>>
>>>>>>>     // Get the stream for tenant-specific Elastic configurations
>>>>>>>     val connectionStream = stream
>>>>>>>         .fromSource(
>>>>>>>             KafkaSource.of(parameters, listOf("elastic-configs")),
>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>             "elastic-configs"
>>>>>>>         )
>>>>>>>
>>>>>>>     // Get the stream of incoming messages to be routed to Elastic
>>>>>>>     stream
>>>>>>>         .fromSource(
>>>>>>>             KafkaSource.of(parameters, listOf("messages")),
>>>>>>>             WatermarkStrategy.noWatermarks(),
>>>>>>>             "messages"
>>>>>>>         )
>>>>>>>         .keyBy { message ->
>>>>>>>             // Key by the tenant in the message
>>>>>>>             message.getTenant()
>>>>>>>         }
>>>>>>>         .connect(
>>>>>>>             // Connect the messages stream with the configurations
>>>>>>>             connectionStream
>>>>>>>         )
>>>>>>>         .process(object : KeyedCoProcessFunction<String, String, 
>>>>>>> String, String>() {
>>>>>>>             // For this key, we need to store all of the previous 
>>>>>>> messages in state
>>>>>>>             // in the case where we don't have a given mapping for this 
>>>>>>> tenant yet
>>>>>>>             lateinit var messagesAwaitingConfigState: ListState<String>
>>>>>>>             lateinit var configState: ValueState<String>
>>>>>>>
>>>>>>>             override fun open(parameters: Configuration) {
>>>>>>>                 super.open(parameters)
>>>>>>>                 // Initialize the states
>>>>>>>                 messagesAwaitingConfigState = 
>>>>>>> runtimeContext.getListState(awaitingStateDesc)
>>>>>>>                 configState = runtimeContext.getState(configStateDesc)
>>>>>>>             }
>>>>>>>
>>>>>>>             // When an element is received
>>>>>>>             override fun processElement1(message: String, context: 
>>>>>>> Context, out: Collector<String>) {
>>>>>>>                 // Check if we have a mapping
>>>>>>>                 if (configState.value() == null){
>>>>>>>                     // We don't have a mapping for this tenant, store 
>>>>>>> messages until we get it
>>>>>>>                     messagesAwaitingConfigState.add(message)
>>>>>>>                 }
>>>>>>>                 else {
>>>>>>>                     // Output the record with some indicator of the 
>>>>>>> route?
>>>>>>>                     out.collect(message)
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>             override fun processElement2(config: String, context: 
>>>>>>> Context, out: Collector<String>) {
>>>>>>>                 // If this mapping is for this specific tenant, store 
>>>>>>> it and flush the pending
>>>>>>>                 // records in state
>>>>>>>                 if (config.getTenant() == context.currentKey){
>>>>>>>                     configState.update(config)
>>>>>>>                     val messagesToFlush = 
>>>>>>> messagesAwaitingConfigState.get()
>>>>>>>                     messagesToFlush.forEach { message ->
>>>>>>>                         out.collect(message)
>>>>>>>                     }
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>             // State descriptors
>>>>>>>             val awaitingStateDesc = ListStateDescriptor(
>>>>>>>                 "messages-awaiting-config",
>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>             )
>>>>>>>
>>>>>>>             val configStateDesc = ValueStateDescriptor(
>>>>>>>                 "elastic-config",
>>>>>>>                 TypeInformation.of(String::class.java)
>>>>>>>             )
>>>>>>>         })
>>>>>>>
>>>>>>>     stream.executeAsync("$APPLICATION_NAME-job")
>>>>>>> }
>>>>>>>
>>>>>>> Basically, connect my tenant-specific configuration stream with my
>>>>>>> incoming messages (keyed by tenant) and buffer them until I have a
>>>>>>> corresponding configuration (to avoid race-conditions). However, I'm
>>>>>>> guessing what will happen here is rather than directly outputting the
>>>>>>> messages from this process function, I'd construct some type of wrapper
>>>>>>> here with the necessary routing/configuration for the message (obtained 
>>>>>>> via
>>>>>>> the configuration stream) along with the element, which might be 
>>>>>>> something
>>>>>>> like a MessageWrapper<ElementT, ConfigurationT> and pass those
>>>>>>> elements to the sink, which would create the tenant-specific Elastic
>>>>>>> connection from the ConfigurationT element and handle caching it
>>>>>>> and then just grab the element and send it on it's way?
>>>>>>>
>>>>>>> Those are really the only bits I'm stuck on at the moment:
>>>>>>>
>>>>>>>    1. The shape of the elements being evicted from the process
>>>>>>>    function (Is a simple wrapper with the configuration for the sink 
>>>>>>> enough
>>>>>>>    here? Do I need to explicitly initialize the sink within this 
>>>>>>> function?
>>>>>>>    Etc.)
>>>>>>>    2. The actual use of the DynamicElasticsearchSink class (Would
>>>>>>>    it just be something like an 
>>>>>>> .addSink(DynamicElasticSearchSink<String,
>>>>>>>    Configuration>()) or perhaps something else entirely?)
>>>>>>>
>>>>>>> Thanks again so much for the advice thus far David, it's greatly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Rion
>>>>>>>
>>>>>>> On Fri, Aug 13, 2021 at 9:04 AM David Morávek <d...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To give you a better idea, in high-level I think could look
>>>>>>>> something like this
>>>>>>>> <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8>
>>>>>>>> [1].
>>>>>>>>
>>>>>>>> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
>>>>>>>>
>>>>>>>> On Fri, Aug 13, 2021 at 2:57 PM Rion Williams <
>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> Thanks for your response! I think there are currently quite a few
>>>>>>>>> unknowns in my end in terms of what a production loads look like but I
>>>>>>>>> think the number of clusters shouldn’t be too large (and will either 
>>>>>>>>> rarely
>>>>>>>>> change or have new entries come in at runtime, but it needs to support
>>>>>>>>> that).
>>>>>>>>>
>>>>>>>>> I think the dynamic approach might be a good route to explore with
>>>>>>>>> actual changes to the Elasticsearch sink as a longer term option. I’m 
>>>>>>>>> not
>>>>>>>>> sure what the dynamic one would look like at the moment though, 
>>>>>>>>> perhaps
>>>>>>>>> that’s something you’d be able to advise on?
>>>>>>>>>
>>>>>>>>> Given that all the records are keyed for a given tenant and I
>>>>>>>>> would have the mappings stored in state, is it possible that within 
>>>>>>>>> the
>>>>>>>>> open() function for this dynamic route to access the state and 
>>>>>>>>> initialize
>>>>>>>>> the client there? Or maybe there’s some other approach (such as 
>>>>>>>>> grouping by
>>>>>>>>> clusters and dynamically handling indices)?
>>>>>>>>>
>>>>>>>>> I’d be happy to give a shot at making the appropriate changes to
>>>>>>>>> the sink as well, although I’m far from an Elastic expert. If you 
>>>>>>>>> point me
>>>>>>>>> in the right direction, I may be able to help out.
>>>>>>>>>
>>>>>>>>> Thanks much!
>>>>>>>>>
>>>>>>>>> Rion
>>>>>>>>>
>>>>>>>>> On Aug 13, 2021, at 6:52 AM, David Morávek <d...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> Hi Rion,
>>>>>>>>>
>>>>>>>>> As you probably already know, for dynamic indices, you can simply
>>>>>>>>> implement your own ElasticsearchSinkFunction
>>>>>>>>> <https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java>
>>>>>>>>> [1], where you can create any request that elastic client supports.
>>>>>>>>>
>>>>>>>>> The tricky part is how to implement dynamic routing into multiple
>>>>>>>>> clusters.
>>>>>>>>> - If the elastic clusters are known upfront (before submitting
>>>>>>>>> job), you can easily create multiple elastic sinks and prepend them 
>>>>>>>>> with a
>>>>>>>>> simple filter (this is basically what split operator does).
>>>>>>>>> - If you discover elastics clusters at runtime, this would require
>>>>>>>>> some changes of the current ElasticsearchSink implementation. I think 
>>>>>>>>> this
>>>>>>>>> may be actually as simple as introducing something like
>>>>>>>>> DynamicElasticsearchSink, that could dynamically create and managed 
>>>>>>>>> "child"
>>>>>>>>> sinks. This would probably require some thoughts about how to manage
>>>>>>>>> consumed resources (memory), because number of child sink could be
>>>>>>>>> potentially unbounded. This could be of course simplified if 
>>>>>>>>> underlying
>>>>>>>>> elastic client already supports that, which I'm not aware of. If 
>>>>>>>>> you'd like
>>>>>>>>> to take this path, it would definitely be a great contribution to 
>>>>>>>>> Flink
>>>>>>>>> (I'm able to provide some guidance).
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams <
>>>>>>>>> rionmons...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I have a use-case that I wanted to initially pose to the mailing
>>>>>>>>>> list as I’m not terribly familiar with the Elasticsearch connector to
>>>>>>>>>> ensure I’m not going down the wrong path trying to accomplish this 
>>>>>>>>>> in Flink
>>>>>>>>>> (or if something downstream might be a better option).
>>>>>>>>>>
>>>>>>>>>> Basically, I have the following pieces to the puzzle:
>>>>>>>>>>
>>>>>>>>>>    - A stream of tenant-specific events
>>>>>>>>>>    - An HTTP endpoint containing mappings for tenant-specific
>>>>>>>>>>    Elastic cluster information (as each tenant has its own specific 
>>>>>>>>>> Elastic
>>>>>>>>>>    cluster/index)
>>>>>>>>>>
>>>>>>>>>> What I’m hoping to accomplish is the following:
>>>>>>>>>>
>>>>>>>>>>    1. One stream will periodically poll the HTTP endpoint and
>>>>>>>>>>    store these cluster mappings in state (keyed by tenant with 
>>>>>>>>>> cluster info as
>>>>>>>>>>    the value)
>>>>>>>>>>    2. The event stream will be keyed by tenant and connected to
>>>>>>>>>>    the cluster mappings stream.
>>>>>>>>>>    3. I’ll need to an Elasticsearch sink that can route the
>>>>>>>>>>    tenant-specific event data to its corresponding cluster/index 
>>>>>>>>>> from the
>>>>>>>>>>    mapping source.
>>>>>>>>>>
>>>>>>>>>> I know that the existing Elasticsearch sink supports dynamic
>>>>>>>>>> indices, however I didn’t know if it’s possible to adjust the 
>>>>>>>>>> cluster like
>>>>>>>>>> I would need on a per-tenant basis or if there’s a better approach 
>>>>>>>>>> here?
>>>>>>>>>>
>>>>>>>>>> Any advice would be appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Rion
>>>>>>>>>>
>>>>>>>>>

Reply via email to